ApacheZooKeeper—高级kafka使用者只消耗生产者发送的消息的一半

r3i60tvu  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(175)

我已经按照示例页面实现了一个高级使用者:https://cwiki.apache.org/confluence/display/kafka/consumer+group+example
当代码运行时,它只消耗生成的消息的一半。我有一个基本的3节点zookeeper集群和2个kafka代理。当我运行简单的消费者代码(不是高级消费者)时,所有消息都被消费掉了。任何想法都将不胜感激。
消费者代码

public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("test", 2);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test");

        executor = Executors.newFixedThreadPool(2);

        int threadNumber = 0;

        for (final KafkaStream stream : streams) {

            executor.submit(new Consumer(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181");
        props.put("group.id", "Consumers");
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("enable.auto.commit", "true");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.commit.interval.ms", "500");

        return new ConsumerConfig(props);
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题