使用keyedcoprocessfunction和FlinkKafka的读取顺序的flink超时

sqxo8psd  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(644)

我用的是 KeyedCoProcessFunction 类来实现类似超时的用例。场景如下:我有一个输入kafka主题和一个输出kafka主题,服务从输入主题读取并处理它(时间可变),然后在输出kafka主题中发布响应。
现在要实现超时(必须使用flink数据流API),我有一个 FlinkKafkaConsumer 从Kafka输入主题中读取,以及另一个 FlinkKafkaConsumer 从Kafka输出主题中读取。我正在连接两条流,并使用 processElement1 我正在注册一个计时器并等待 onTimer 方法,或者 processElement2 在此之前触发,因此我删除计时器,并且不声明超时。
在测试大型活动时,我看到 NULLPTREXCEPTION 我怀疑 processElement2 之前被解雇 ProcessElement1 ,在上述情况下,由于任何原因(知道服务处理元素所花费的时间可能需要几秒钟),从输出主题读取元素的场景会在从输入主题读取之前发生吗?在这种情况下,实现上述超时功能的最佳情况是什么使用flink数据流API,有什么提示吗?
谢谢您。

67up9zun

67up9zun1#

是的,不能保证Flink会打电话给 processElement1 之前的方法 processElement2 . 有可能,但不确定。或者换言之,情况可能并不总是这样。
这与你在Flink训练中的longridealerts练习中发现的情况完全相同--https://github.com/apache/flink-training/tree/master/long-ride-alerts --所以你可以模仿那里使用的解决方案。逻辑是这样的:
无论哪个事件(对于给定的密钥)首先到达,都从任一流中存储它
如果首先到达的是输入事件(来自topic1),请设置计时器
如果是第二个到达的输出事件(来自topic2),则在计时器触发之前,删除计时器
无论哪个事件(对于给定的密钥)在第二个到达,从任一流中清除状态(由两个流共享)
每当计时器启动时,使用从topic1保存的输入事件创建一个报告并清除状态
每当topic1中的输入事件丢失时,此解决方案不会泄漏状态。如有必要,请参阅培训练习附带的讨论,了解如何处理该问题(简短版本:使用状态ttl)。

相关问题