storm kafkaspout失败的元组重复

pbpqsu0x  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(385)

我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并使用baserichbolt、一个kafkaspout和两个bolt-a、bolt-b进行配置,一旦bolt-b确认元组将被视为已成功处理的元组并提交,元组将锚定在bolt-a中。但是,问题是由于某种原因,一些失败的消息在kafkaspout中被复制了。
例如
kafkaspout在处理它时发出了1000个元组,由于某种原因,有将近20个元组失败了(在bolt-b)。这20个元组是连续的重放,在某个点上,worker被杀死,而supervisor重新启动worker,这20个元组又是重放,这次它成功地处理了,但处理了多次(重复)。

但是,我希望那些元组只能处理一次(成功)。我已将topology.enable.message.timeouts设置为false。我的另一个问题是,风暴在哪里储存了那些失败的Kafka补偿细节。我在zookeeper上没有找到它,它只有下面的细节。
{“topology”:{“id”:“test\u topology-12-1508938595”,“name”:“test\u topology”},“offset”:505,“partition”:2,“broker”:{“host”:“127.0.0.1”,“port”:9092},“topic”:“test\u topology 1”}

u5rb5r59

u5rb5r591#

禁用消息超时可能会导致消息丢失,如果需要处理所有消息,可能需要重新考虑禁用它。
启用确认时,storm至少提供一次处理保证。您可能想看看是否可以使螺栓幂等,以便重放不会引起问题。或者你可以看看https://storm.apache.org/releases/1.1.1/trident-tutorial.html,它只提供一次状态更新。
编辑:你可能需要重新思考你的问题。据我所知,没有一个流处理系统能提供你想要的一次处理。
trident提供的一次语义是trident将帮助您使状态更新是幂等的,因此从数据存储的Angular 来看,“看起来”消息只处理一次。处理仍至少一次。请参阅上的“事务性喷口”部分(可能还有本页的其余部分)https://storm.apache.org/releases/2.0.0-snapshot/trident-state.html 对于如何工作的直觉。基本思想是在数据存储中存储关于哪些消息已经被写入的信息,这样如果消息被重复,状态更新代码就可以忽略它们。
你可能还想读书https://streaml.io/blog/exactly-once. 我想说的是,flink实现了类似于这里描述的分布式快照算法的东西,这是一种在至少一次系统中精确模拟一次的不同方法。

相关问题