kafkaconsumer poll()每次在第一次调用后返回分区大小0

wfveoks0  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(276)

遵循以下文件:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
我正在使用手动偏移量控制,并尝试使用commitsync()方法的一个简单测试用例将偏移量设置为静态值。然后,我想再次调用poll()方法来使用该偏移量。这应该通过while循环来处理:

try {
     while(true) {
         System.out.println("outer loop");
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         System.out.println("partition size: " + records.partitions().size());

         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());

                 long offset = record.offset();
                 if (SOME CONDITION) {
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset+1)));
                 }
             }
         }
     }
 } finally {
   consumer.close();
 }

但是,在第一次poll()调用之后,似乎总是得到大小为0的分区。
在我的输出中,我只看到以下循环:

outer loop
partition size: 0

但是,如果我终止程序并重新运行它,我可以看到commitsync()工作了,它从偏移量6开始读取。我想知道如何做到这一点,而不手动终止和重新运行程序。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题