kafka——消费者的阅读量正好是流的一半

von4xj4u  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(373)

我使用下面的代码来读取主题的数据,即“sha-test2”,但它读取的是完全不同的代码行,即20行中的10行。但当我运行console时,它会显示所有20行。即。bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic sha-test2——从头开始
我怎么了?非常感谢你的帮助。

public class KafkaTestConsumer extends  Thread {
    //final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "sha-test2";
    ConsumerConnector consumerConnector;

    public static void main(String[] argv) throws   
     UnsupportedEncodingException {
        KafkaTestConsumer helloKafkaConsumer = new KafkaTestConsumer();
        helloKafkaConsumer.start();
    }
    public KafkaTestConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","172.23.32.35:2181");
        properties.put("group.id","test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = 
         Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  
         consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println("consumerMap : \n " + consumerMap.toString() );
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

       System.out.println("run started");
        while(it.hasNext()){
            System.out.println(new String(it.next().message()));
        }
}

Thank you.
~Shyam
7kqas0il

7kqas0il1#

问题在于:

topicCountMap.put(TOPIC, new Integer(1));

你告诉警察 consumerConnector 为主题创建单个使用者线程,但主题(显然)有两个分区。中的使用者线程数 "test-group" 组应该等于或大于分区数,否则某些分区将不会被组读取,这正是您的情况。
请看这个示例,其中线程数是通过命令行参数设置的。
或者,您可以从zookeeper中读取存储其元数据的分区的确切数目 /brokers/topics/your_topic_name/partitions 节点。

tuwxkamq

tuwxkamq2#

你的代码看起来很好。这看起来像是一个抵消问题。高级消费者将其补偿存储在zookeeper中。
在你的情况下,这就是happened:- 1. 你在Kafka2中放了10条信息。你运行了消费代码,它成功地读取了所有10条信息。同时,消费者在zookeeper中将消耗的偏移量更新为10。三。你阻止你的消费者。4你又给Kafka发了10条信息。您再次启动消费代码。它只读取最后10条消息,而不是之前推送的10条消息,因为当您重新启动consumer时,它将检查zookeeper以找出从哪个偏移量恢复消费。
请尝试使用不同的组id重新运行您的使用者,或在从zookeeper中删除偏移量后重试。它应该很好用。

properties.put("group.id","test-group420");

相关问题