java中的多线程ApacheKafka消费程序

qcuzuvrc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我是新来的 Apache kafka . 我的任务是编写一个使用者来使用来自主题的消息。以下是我的疑问:
1) 我应该通过显式地提供主题名称(作为参数传递给main方法)来告诉我的消费者java程序,还是有任何方法可以自动发现主题?
2) 我在某个地方读到,消费者程序应该是多线程的,在浏览了这个链接之后https://cwiki.apache.org/confluence/display/kafka/consumer+group+example,我认为这是创造了从一个主题阅读线程数。我认为多线程意味着我的消费程序应该产生和主题一样多的线程。如果我必须阅读3个主题的信息,那么我是否需要为每个主题启动3个消费者程序?我不能在单个程序中执行相同的操作吗?请澄清。
3) 我读到有两种类型的消费者计划: Simple and High level . 不知道该写哪一个。
请帮助我开始工作。如果提供很少的源代码将不胜感激。
如果我不能把事情说清楚,我很抱歉。

8yoxcaq7

8yoxcaq71#

1) 从kafka集群获取主题列表可能是一项棘手的工作,因为没有这样的直接api可用于此。。最好的办法是查询zookeeper,因为kafka使用它来维护它的所有状态,这个链接解释了zookeeper的数据结构。您可以阅读此链接进行类似的讨论
2) 该示例假设您将使用它从给定的主题中获取数据,您可以尝试创建一个主题列表并循环查看它们的运行情况,例如

for(String topic : topicList) {
         new ConsumerGroupExample(zooKeeper, groupId, topic).run(threads);
     }

但不确定这个的确切结果,如果你能分享你的发现就好了
3) 取决于基于用例对主题的控制程度。高级别做了大量的簿记和错误处理,但不允许对主题进行细粒度访问,例如多次阅读同一条消息,使用特定的消息子集等,因为所有这些功能都是由简单的使用者提供的,但在这种情况下,您需要处理主题偏移管理之类的情况,经纪人和领导者检测等。
你应该在他们的consmer文档中详细查看一下,以了解他们对同一问题的理解。

相关问题