我正在考虑这个水印:
class MyWatermarker(val maxTimeLag: Long = 0)
extends AssignerWithPeriodicWatermarks[MyEvent] {
var maxTs: Long = 0
override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = e.timestamp
maxTs = maxTs.max(timestamp)
timestamp
}
override def getCurrentWatermark: WatermarkOld = {
println(s"event watermark: ${maxTs - maxTimeLag}")
new WatermarkOld(maxTs - maxTimeLag)
}
底层事件来自kafka源,然后交给流程函数。实现与问题无关,我只分享相关的一点:
override def processElement(
event: MyEvent,
ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
out: Collector[StreamEvent]
): Unit = {
println(
s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
)
...
}
当我使用具有空闲分区的kafka源主题在真正的kubernetes集群上运行此应用程序时,水印将按预期保留为0:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0
我还可以看到watermarker中生成的日志:
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
有趣的是,当我在intellij上本地运行同一个应用程序,并且为同一主题设置了空闲的kafka分区时,我还从watermarker获取了上述日志,其中水印在0和最新接收元素的ts之间摆动,因为 maxLag = 0
. 然而,对我来说很出乎意料的是,process函数的日志显示水印仍在前进:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618
为什么会这样?仅供参考,我正在使用flink1.10,将环境并行性设置为2,并在这两种情况下使用事件时间语义。
1条答案
按热度按时间gpnt7bae1#
如果要使用每个分区的水印,可以直接在flink kafka使用者[1]上调用assigntimestampsandwatermarks,那么我相信空闲分区会一直保留整个水印。
对于每分区水印,每个kafka源任务将水印分别应用于它所处理的每个分区,然后将每分区水印中的最小值作为它的水印。因此,至少有一个kafka源任务的水印为0,并且假设您在水印之后和process函数之前有一个keyby,这将在process函数中保留水印。
否则,如果将水印应用于kafka源任务的输出,则水印任务是否具有0的水印取决于其对应的kafka源任务是否具有任何非空闲分区。如果将分区分配给示例是不确定的,这可以解释为什么在intellij中看到不同的结果。
请注意,Flink1.11[2]中对空闲源的处理进行了修改,与此相关的错误修复仍在等待[3]。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-消费者和时间戳提取水印发射。
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-有空闲的资源。
[3] https://issues.apache.org/jira/browse/flink-18934