热释光;dr:目前保证flink事件时间顺序的最佳解决方案是什么?
我使用flink 1.8.0和kafka 2.2.1。我需要通过事件时间戳来保证事件的正确顺序。我每1s生成一个周期性的水印。我使用flinkkafkaconsumer和ascendingtimestampextractor:
val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)
然后处理:
myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)
我意识到,对于无序的事件,在同一毫秒或几毫秒之后发生的,顺序不是由flink纠正的。我在文件中发现:
水印触发所有窗口的计算,其中最大时间戳(即结束时间戳-1)小于新水印
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-水印和窗口
所以我准备了额外的处理步骤来保证事件的时间顺序:
myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)
然而,我发现这个解决方案很难看,它看起来像一个解决办法。我还关心kafkasource的每个分区的水印
理想情况下,我想把秩序的保证,在Kafka资源,并保持它为每个Kafka分区,像每个分区水印。有可能吗?目前保证flink事件时间顺序的最佳解决方案是什么?
2条答案
按热度按时间c8ib6hqw1#
flink不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序进行处理,但当两个或多个分区合并到一个新分区时(由于流的重新分区或合并),flink会将这些分区的记录随机合并到新分区中。其他一切都将是低效的,并导致更高的延迟。
例如,如果您的作业有一个从两个kafka分区读取的源任务,那么两个分区的记录将以一种随机的之字形模式合并。
但是,flink保证所有事件都是针对生成的水印进行正确处理的。这意味着,水印永远不会超过记录。例如,如果kafka源生成每个分区的水印,那么即使合并了多个分区的记录,这些水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它确保了输入数据的完整性。
这是按时间戳排序记录的先决条件。你可以用一个翻滚的Windows。但是,你应该意识到
所有窗口都将在单个任务中执行(即,它不是并行的)。如果每个键的顺序足够,那么应该使用常规的滚动窗口,或者更好地实现
KeyedProcessFunction
,效率更高。由于重新分区或更改并行性而重新组织流时,顺序将被破坏。
x0fgdtte2#
这是一个伟大的观点。Kafka资源的秩序保障实际上包括两部分。
保证同一子任务中分区之间的顺序。
保证子任务之间的顺序。
第一部分已经在进行中https://issues.apache.org/jira/browse/flink-12675. 第二部分需要子任务之间共享状态的支持,这可能需要社区中更多的讨论和详细的计划。
回到你的问题,我认为通过设置一个缓冲数据的窗口来保持事件的顺序是目前最好的解决方案。