如何处理Kafka分区数大于Flink并行度时的水印情况

2cmtqfgy  于 2023-02-20  发布在  Apache
关注(0)|答案(1)|浏览(273)

我试图找出一个解决方案,以解决当Kafka分区的数量大于Flink并行性时水印进度的问题。
例如,我有一个并行度为3的Flink应用,它需要从5个Kafka分区读取数据。我的问题是,当启动Flink应用时,它必须使用这些分区中的历史数据。据我所知,每个Flink任务都开始使用相应分区中的事件(可能缓冲了大量的事件)以及在同一任务转换到另一分区之前的进度事件时间(因此是水印),根据已经发布的水印,该另一分区现在将具有陈旧数据。
我尝试考虑使用几秒水印对齐的水印策略,但这并不能解决问题,因为历史数据会立即从一个分区中消耗掉,因此事件时间/水印会提前。以下是一段代码,展示了所实现的水印策略。

WatermarkStrategy.forGenerator(ws)
        .withTimestampAssigner(
            (event, timestamp) -> (long) event.get("event_time))
        .withIdleness(IDLENESS_PERIOD)
        .withWatermarkAlignment(
            GROUP,
            Duration.ofMillis(DEFAULT_MAX_WATERMARK_DRIFT_BETWEEN_PARTITIONS),
            Duration.ofMillis(DEFAULT_UPDATE_FOR_WATERMARK_DRIFT_BETWEEN_PARTITIONS));

我还尝试使用下游操作符对事件进行排序,如这里所述Sorting union of streams to identify user sessions in Apache Flink,但这也不能有效地解决我的问题,因为事件记录时间可能会显著偏离。
我该如何解决这个问题呢?我是否需要拥有与Kafka分区数量相同的Flink任务,或者我遗漏了从Kafka分区读取数据的方式

0pizxfdo

0pizxfdo1#

此问题最简单的解决方案是将fromSourceWatermarkStrategy一起使用,而不是使用assignTimestampsAndWatermarks对其进行赋值。
当您直接在带有Kafka连接器的fromSource中使用WatermarkStrategy时,水印将是分区感知的,因此由给定操作符生成的水印将是分配给此操作符的所有分区的最小值。
直接在源代码中分配水印将解决您所面临的问题,但它有一个主要缺点,因为生成的水印由给定的操作符处理的所有分区中的min,如果一些分区是空闲水印,则此操作符也不会进行。
文档描述Kafka连接器水印在这里。

相关问题