我写Kafka消费高容量高速分布式应用程序。我只有一个主题,但传入的消息率非常高。有多个分区服务于更多的使用者将适合这个用例。最好的消费方式是拥有多个流读取器。根据文档或可用的示例,consumerconnector发出的kafkastreams的数量取决于主题的数量。想知道如何获得多个kafkastream读取器[基于分区],以便我可以在每个流中跨一个线程,或者在多个线程中读取同一个kafkastream将执行多个分区的并发读取?
任何见解都是非常感谢的。
我写Kafka消费高容量高速分布式应用程序。我只有一个主题,但传入的消息率非常高。有多个分区服务于更多的使用者将适合这个用例。最好的消费方式是拥有多个流读取器。根据文档或可用的示例,consumerconnector发出的kafkastreams的数量取决于主题的数量。想知道如何获得多个kafkastream读取器[基于分区],以便我可以在每个流中跨一个线程,或者在多个线程中读取同一个kafkastream将执行多个分区的并发读取?
任何见解都是非常感谢的。
3条答案
按热度按时间1mrurvl11#
想和大家分享一下我在邮件列表中的发现:
在主题Map中传递的数字控制一个主题被划分为多少个流。在您的例子中,如果传入1,那么所有10个分区的数据都将被馈送到1个流中。如果传入2,则2个流中的每个流将从5个分区获取数据。如果传入11,其中10个将分别从1个分区获取数据,1个流将一无所获。
通常,您需要在自己的线程中迭代每个流。这是因为如果没有新事件,每个流都可能永远阻塞。
示例代码段:
参考文献:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3cca+shyy_z903domnjp7_yyr_ae2srw-x7xpanqkmwap66goqf6w@mail.gmail.com%3e
t40tm48m2#
上面的代码将在主题名为“output topic”的分区1处写入记录
dba5bblo3#
建议的方法是拥有一个线程池,这样java就可以为您处理组织,createmessagestreamsbyfilter方法为每个流提供了一个可运行的线程池。例如:
在这个例子中,我要求6个线程,因为我知道每个主题有3个分区,我在白名单中列出了两个主题。一旦我们有了传入流的句柄,我们就可以迭代它们的内容,即messageandmetadata对象。元数据实际上只是主题名和偏移量。正如您所发现的,如果您请求1个流而不是我的示例6中的流,那么您可以在单个线程中执行,但是如果您需要并行处理,那么最好的方法是启动一个执行器,每个返回的流有一个线程。