遵循以下文件:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
我正在使用手动偏移量控制,并尝试使用commitsync()方法的一个简单测试用例将偏移量设置为静态值。然后,我想再次调用poll()方法来使用该偏移量。这应该通过while循环来处理:
try {
while(true) {
System.out.println("outer loop");
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
System.out.println("partition size: " + records.partitions().size());
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
long offset = record.offset();
if (SOME CONDITION) {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset+1)));
}
}
}
}
} finally {
consumer.close();
}
但是,在第一次poll()调用之后,似乎总是得到大小为0的分区。
在我的输出中,我只看到以下循环:
outer loop
partition size: 0
但是,如果我终止程序并重新运行它,我可以看到commitsync()工作了,它从偏移量6开始读取。我想知道如何做到这一点,而不手动终止和重新运行程序。
暂无答案!
目前还没有任何答案,快来回答吧!