如何在flink中使用模式匹配where子句?

ymdaylpp  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(475)

我的flink计划如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

val partitionedInput = dataStream.keyBy(jsonString => {
  val jsonParser = new JsonParser()
  val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
  jsonObject.get("account")
})

val pattern = Pattern.begin[String]("start").where(jsonString => 
            val jsonParser = new JsonParser()
            val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
            jsonObject.get("account") == "iOS") //ERROR HERE

val patternStream = CEP.pattern(partitionedInput, pattern)

我的电脑出了个错误 val pattern = ... 台词 Expected IterativeCondition[String], actual: (Nothing) => Unit .
我的 dataStream 由我在 keyBy 通过json对象内的account键进行设置。然后,我试图创建一个模式,但我得到一个错误时,创建模式。

67up9zun

67up9zun1#

确保使用正确的api。对于scala,您应该导入

import org.apache.flink.cep.scala.pattern.Pattern

而不是

import org.apache.flink.cep.pattern.Pattern

相关问题