我有以下代码:
import java.util.Properties
import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val brokers = configFactory.getString("kafka.broker")
val topicChannel1 = configFactory.getString("kafka.topic1")
val props = new Properties()
...
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val priceCheck = Pattern.begin[String]("start").where({jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account").toString == "iOS"})
val pattern = CEP.pattern(partitionedInput, priceCheck)
val newStream = pattern.select(x =>
x.get("start").map({str =>
str
})
)
newStream.print()
env.execute()
}
}
出于某种原因在上面的代码中 newStream.print()
什么都没有打印出来。我肯定Kafka中有数据符合我上面定义的模式,但由于某种原因没有打印出来。
有人能帮我找出这个代码中的错误吗?
编辑:
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
val jsonParser = new JsonParser()
val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis())
}
}
我在Flink医生上看到你可以回来 prevElementTimestamp
在 extractTimestamp
方法(如果您使用的是kafka010)和 new Watermark(System.currentTimeMillis)
在 getCurrentWatermark
方法。
但我不明白什么 prevElementTimestamp
是或者为什么一个人会回来 new Watermark(System.currentTimeMillis)
作为水印而不是其他东西。你能详细说明我们为什么要这样做吗 WaterMark
以及 EventTime
一起工作好吗?
1条答案
按热度按时间8ljdwjyq1#
你把你的工作安排在
EventTime
,但不提供时间戳和水印提取器。有关在事件时间中工作的更多信息,请参阅这些文档。如果你想使用Kafka嵌入的时间戳这个文件可能会帮助你。
在
EventTime
cep库在水印到达时缓冲事件,以便正确处理无序事件。在您的例子中,没有生成水印,因此事件被无限缓冲。编辑:
对于
prevElementTimestamp
我觉得文件很清楚:使用Kafka的时间戳时,不需要定义时间戳提取器。extracttimestamp()方法的previouselementtimestamp参数包含kafka消息携带的时间戳。
因为kafka 0.10.x kafka消息可以嵌入时间戳。
生成
Watermark
作为new Watermark(System.currentTimeMillis)
在这种情况下不是个好主意。你应该创造Watermark
基于你对数据的了解。了解如何Watermark
以及EventTime
一起工作我再清楚不过了