当使用空闲分区从kafka主题进行消费时,水印生成在本地/独立模式下的工作方式是否不同?

6ie5vjzr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(466)

我正在考虑这个水印:

class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }

底层事件来自kafka源,然后交给流程函数。实现与问题无关,我只分享相关的一点:

override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }

当我使用具有空闲分区的kafka源主题在真正的kubernetes集群上运行此应用程序时,水印将按预期保留为0:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0

我还可以看到watermarker中生成的日志:

event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0

有趣的是,当我在intellij上本地运行同一个应用程序,并且为同一主题设置了空闲的kafka分区时,我还从watermarker获取了上述日志,其中水印在0和最新接收元素的ts之间摆动,因为 maxLag = 0 . 然而,对我来说很出乎意料的是,process函数的日志显示水印仍在前进:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618

为什么会这样?仅供参考,我正在使用flink1.10,将环境并行性设置为2,并在这两种情况下使用事件时间语义。

gpnt7bae

gpnt7bae1#

如果要使用每个分区的水印,可以直接在flink kafka使用者[1]上调用assigntimestampsandwatermarks,那么我相信空闲分区会一直保留整个水印。
对于每分区水印,每个kafka源任务将水印分别应用于它所处理的每个分区,然后将每分区水印中的最小值作为它的水印。因此,至少有一个kafka源任务的水印为0,并且假设您在水印之后和process函数之前有一个keyby,这将在process函数中保留水印。
否则,如果将水印应用于kafka源任务的输出,则水印任务是否具有0的水印取决于其对应的kafka源任务是否具有任何非空闲分区。如果将分区分配给示例是不确定的,这可以解释为什么在intellij中看到不同的结果。
请注意,Flink1.11[2]中对空闲源的处理进行了修改,与此相关的错误修复仍在等待[3]。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-消费者和时间戳提取水印发射。
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-有空闲的资源。
[3] https://issues.apache.org/jira/browse/flink-18934

相关问题