flink kinesis consumer未存储最后成功处理的序列号

vuv7lop3  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(249)

我们使用flink kinesis consumer将kinesis流中的数据消费到flink应用程序中。
kcl库使用dynamodb表来存储最后一个成功处理的kinesis流序列号,这样下次应用程序启动时,它就会从停止的位置恢复。
但是,似乎flink kinesis消费者在任何持久存储中都不维护任何这样的序列号。因此,我们需要依赖sharditeratortype(trim\u horizen、latest等)来决定在应用程序重新启动时恢复flink应用程序处理的位置。
一个可能的解决方案是依赖于flink检查点机制,但这只在应用程序失败后恢复时起作用,而不是在应用程序被故意取消并且需要从最后一个成功使用的kinesis流序列号重新启动时起作用。
我们需要自己存储这些最后成功消费的序列号吗?

5fjcxozz

5fjcxozz1#

为了补充david的回答,我想解释一下不存储序列号背后的原因。
提交到源系统中的任何类型的偏移都会将检查点/保存点功能限制为容错。也就是说,只有最新的检查点/保存点才能恢复。
然而,flink实际上支持跳回到以前的检查点/保存点。考虑应用程序升级。在升级之前,先创建一个保存点,让它运行几分钟,然后创建几个检查点。然后,您发现了一个关键的bug。您希望回滚到已获取的保存点并放弃所有检查点。
现在,如果flink只将源偏移提交给源系统,我们将无法重放从现在到恢复的保存点之间的数据。所以,正如david所指出的,flink需要将偏移量存储在savepoint本身中。在这一点上,另外提交到源系统并不会产生任何好处,而且在恢复到以前的保存点/检查点时会造成混乱。
你认为额外存储偏移量有什么好处吗?

jhdbpxl9

jhdbpxl92#

flink的最佳实践是使用检查点和保存点,因为这些检查点和保存点创建了一致的快照,其中包含消息队列中的偏移量(在本例中为kinesis流序列号)以及作业图其余部分中的所有状态,这些状态是由于将数据消耗到这些偏移量而导致的。这样就可以在不丢失或重复数据的情况下恢复或重新启动。
flink的检查点是flink为从故障中恢复而自动拍摄的快照,并且是为快速恢复而优化的格式。保存点使用相同的底层快照机制,但是是手动触发的,它们的格式更关心操作灵活性而不是性能。
保存点就是你要找的。特别是,使用保存点取消和从保存点恢复非常有用。
另一个选项是将保留的检查点与externalizedcheckpointcleanup.retain\u on\u取消一起使用。

相关问题