storm中的java延迟队列实现——kafka、cassandra、redis或beanstalk?

wfsdck30  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(454)

我有一个storm拓扑来处理来自kafka的消息,并根据手头的任务在cassandra中进行http调用/保存。信息一到我就处理。由于来自外部源(如http)的响应,很少有消息没有被完全处理。我想实现一个指数退避机制,以便在http服务器不响应/返回错误消息的情况下重试一段时间。我能想到一些我能实现它们的想法。我想知道他们中的哪一个将是一个更好的解决方案,如果有任何其他解决方案,我可以使用容错。由于这是用来实现指数退避,每个消息将有不同的延迟时间。
发送另一个主题在Kafka,这是消费后。我的首选方案。我知道我们可以使用Kafka偏移量,所以在后期使用消息。我怎么也找不到文档/示例代码来做同样的事情。如果有人能帮我解决这个问题,那真的很有帮助。
编写消息cassandra/redis,并编写一个调度程序来获取尚未处理的消息,并将其发送给kafka,以便我的storm拓扑可以使用它(其他遗留项目中的现有解决方案(非storm))
延迟发送到beanstalk(其他遗留项目(非storm)中的现有解决方案)。我多么希望避免使用此解决方案,并且仅在我无法选择的情况下使用它)。
而这正是我想做的。我找不到实现delayprocessinguntil的文档,正如kafka中提到的,使用高级使用者的延迟队列实现
我以前用beanstalk做过数据存储和延迟的预定工作,但我更喜欢用kafka。

2j4z5cfb

2j4z5cfb1#

我认为您的用例描述了对数据库而不是队列的需求。您希望临时存储记录,直到它们出现,然后删除它们,这样它们就不会出现在以后的搜索中。如你的分析所示,在队列中尝试这样做充其量也会很尴尬。
我建议您在cassandra中创建另一个列族来保存这些延迟的请求。您将存储请求本身以及重试时间。您是否还想拥有失败的http尝试和相关数据的时间序列取决于您自己。当延迟请求最终完成时,您需要从cf中删除相应的行。对延迟请求的搜索也很简单。
当然,任何数据库,甚至本地驱动器或hdfs中的文件都可以工作。

xuo3flqw

xuo3flqw2#

你可能会对Kafka重试项目感兴趣https://github.com/ibm/kafka-retry. 它使用单个重试主题提供延迟重试队列。

o8x7eapl

o8x7eapl3#

Kafka喷口有一个指数退避消息重试内置。您可以通过喷口配置来配置初始延迟、延迟倍增和最大延迟。如果bolt中有错误,可以调用collector.fail(input)。在那之后,你只需让它喷做重试。
https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/exponentialbackoffmsgretrymanager.java

相关问题