kafka-动态/任意分区

jrcvhitl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我正在为Kafka主题建立一个消费者服务。每条消息都包含一个url,我的服务将向其发出http请求。每个消息/url完全独立于其他消息/url。
我担心的问题是如何处理长时间运行的请求。有些http请求可能需要50多分钟才能返回响应。在此期间,我不想再留任何其他信息。
并行化此操作的最佳方法是什么?
我知道Kafka的并行方法是创建分区。然而,从我所读到的内容来看,当我真的想要无限或动态的分区数时,似乎需要预先定义分区数(理想情况下,每个消息都会动态地创建自己的分区)
例如,假设我创建了1000个分区。如果针对我的主题生成了1001+条消息,那么将发出前1000个请求,但之后的每条消息都将排队等待,直到该分区中的前一个请求完成。
我曾考虑过使http请求异步,但后来在确定要提交的偏移量时似乎遇到了问题。
例如,在单个分区上,我可以让使用者读取第一条消息并发出异步请求。它提供了一个回调函数,将该偏移提交给kafka。在该请求等待时,我的使用者读取下一条消息并发出另一个异步请求。如果该请求在第一个请求之前完成,它将提交该偏移量。现在,如果第一个请求由于某种原因失败,或者我的消费进程死亡,会发生什么?如果我已经提交了一个更高的偏移量,听起来这意味着我的第一条消息永远不会被重新处理,这不是我想要的。
在使用kafka进行长时间运行的异步消息处理时,我显然遗漏了一些东西。有没有人遇到过类似的问题,或者有没有想过如何最好地解决这个问题?提前感谢您抽出时间阅读本文。

ldxq2e6h

ldxq2e6h1#

您应该查看apachestorm以了解消费者的处理部分,并将消息存储和检索留给kafka。您所描述的是大数据中一个非常常见的用例(尽管50分钟以上的事情有点极端)。简而言之,您的主题将有少量的分区,并让storm流处理扩展实际发出http请求的组件(storm speak中的“bolt”)的数量。单个喷口(类似于从外部源读取数据的storm组件)可以读取来自kafka主题的消息并将其流式传输到处理螺栓。
我发布了一个如何在github上编写storm/kafka应用程序的开源示例。
以下是对这个答案的一些看法:
1) 虽然我认为storm是正确的平台方法,但是没有理由不编写一个runnable来执行http调用,然后再编写一些代码,让单个kafka消费者读取消息并使用runnable的多线程示例处理它们。所需的管理代码有点有趣,但可能比从头开始学习storm更容易编写。所以您可以通过在更多线程上添加更多的runnable示例来进行扩展。
2) 无论您使用的是storm还是您自己的多线程解决方案,在kafka中仍然存在如何管理偏移量的问题。简单的回答是,你将不得不做自己复杂的抵消管理。你不仅要保存从Kafka那里读到的最后一封邮件的偏移量,还要保存和管理当前正在处理的飞行中邮件的列表。这样,如果你的应用程序宕机,你就知道正在处理哪些消息,你可以在启动备份时检索并重新处理它们。基本kafka偏移量持久性不支持这种更复杂的需求,但它只是为了方便更简单的用例。你可以把你的偏移量信息保存在任何你喜欢的地方(zookeeper,文件系统或任何数据库)。

相关问题