我的flink计划如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val pattern = Pattern.begin[String]("start").where(jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account") == "iOS") //ERROR HERE
val patternStream = CEP.pattern(partitionedInput, pattern)
我的电脑出了个错误 val pattern = ...
台词 Expected IterativeCondition[String], actual: (Nothing) => Unit
.
我的 dataStream
由我在 keyBy
通过json对象内的account键进行设置。然后,我试图创建一个模式,但我得到一个错误时,创建模式。
1条答案
按热度按时间67up9zun1#
确保使用正确的api。对于scala,您应该导入
而不是