作为kafka使用者同时使用多个主题

vq8itlhq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(535)

下面是一个例子https://cwiki.apache.org/confluence/display/kafka/consumer+group+example 关于Kafka主题的同时消费。
在创建线程池部分,他们有以下代码

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}

我可以在topiccountmap中添加更多主题。例如,

topicCountMap.put("channel1", new Integer(a_numThreads));
topicCountMap.put("channe2", new Integer(a_numThreads));
topicCountMap.put("channel3", new Integer(a_numThreads));

在上面的代码中,streams对象似乎只Map到其中一个主题

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

我不完全确定如何创建多个流对象,每个流对象Map到给定的主题,然后遍历它们,从每个通道获取数据并将它们提交给executor。

epfja78i

epfja78i1#

假设你有:

String topic1 = "channel1";
String topic2 = "channel2";
String topic3 = "channel3";

那么,你确实可以做到:

topicCountMap.put(topic1, new Integer(a_numThreads_topic1));
topicCountMap.put(topic2, new Integer(a_numThreads_topic2));
topicCountMap.put(topic3, new Integer(a_numThreads_topic3));

一旦您获得了consumermap(不会更改的代码),您将能够检索每个主题的流:

List<KafkaStream<byte[], byte[]>> topic1_streams = consumerMap.get(topic1);
List<KafkaStream<byte[], byte[]>> topic2_streams = consumerMap.get(topic2);
List<KafkaStream<byte[], byte[]>> topic3_streams = consumerMap.get(topic3);

要从流中消费,您需要创建适当数量的执行器:

executors_topic1 = Executors.newFixedThreadPool(a_numThreads_topic1);
executors_topic2 = Executors.newFixedThreadPool(a_numThreads_topic2);
executors_topic3 = Executors.newFixedThreadPool(a_numThreads_topic3);

最后:

int threadNumber = 0;
for (final KafkaStream stream : topic1_streams) {
    executors_topic1.submit(new ConsumerTest(streams, threadNumber));
    threadNumber++;
}
for (final KafkaStream stream : topic2_streams) {
    executors_topic2.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
}
for (final KafkaStream stream : topic3_streams) {
    executor_topic3.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
}

当然,这只是给你一个想法。显然,代码可以改进。

相关问题