我已经按照示例页面实现了一个高级使用者:https://cwiki.apache.org/confluence/display/kafka/consumer+group+example
当代码运行时,它只消耗生成的消息的一半。我有一个基本的3节点zookeeper集群和2个kafka代理。当我运行简单的消费者代码(不是高级消费者)时,所有消息都被消费掉了。任何想法都将不胜感激。
消费者代码
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 2);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test");
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new Consumer(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181");
props.put("group.id", "Consumers");
props.put("zookeeper.session.timeout.ms", "10000");
props.put("enable.auto.commit", "true");
props.put("zookeeper.sync.time.ms", "1000");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.commit.interval.ms", "500");
return new ConsumerConfig(props);
}
暂无答案!
目前还没有任何答案,快来回答吧!