下面是一个例子https://cwiki.apache.org/confluence/display/kafka/consumer+group+example 关于Kafka主题的同时消费。
在创建线程池部分,他们有以下代码
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
我可以在topiccountmap中添加更多主题。例如,
topicCountMap.put("channel1", new Integer(a_numThreads));
topicCountMap.put("channe2", new Integer(a_numThreads));
topicCountMap.put("channel3", new Integer(a_numThreads));
在上面的代码中,streams对象似乎只Map到其中一个主题
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
我不完全确定如何创建多个流对象,每个流对象Map到给定的主题,然后遍历它们,从每个通道获取数据并将它们提交给executor。
1条答案
按热度按时间epfja78i1#
假设你有:
那么,你确实可以做到:
一旦您获得了consumermap(不会更改的代码),您将能够检索每个主题的流:
要从流中消费,您需要创建适当数量的执行器:
最后:
当然,这只是给你一个想法。显然,代码可以改进。