kafkaspout是否多线程

s2j5cfk0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(403)

kafka 0.8.x doc演示了如何在kafka consumer中使用多线程:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
    executor.execute(new ConsumerTest(stream, threadNumber));
    threadNumber++;
}

但Kafka在Storm中似乎并没有多线程。在kafkaspout中可以使用多任务而不是多线程:

builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), nThread);

哪一个更好?谢谢

l3zydbqr

l3zydbqr1#

既然您提到了kafka0.8.x,我就假设您使用的kafkaspout来自storm kafka,而不是storm kafka客户端。
第一个代码片段是高级使用者api,它可以使用多个线程来使用多个分区。
至于Kafka喷口,它可能是相同的,但风暴是使用低层次的消费者,即simpleconsumer。但是,将为每个喷口执行器(任务)创建一个simpleconsumer示例。

相关问题