我用的是 KafkaConsumer
使用以下设置 enable.auto.commit
=
true auto.commit.interval.ms
= 2000
.. 每5秒进行一次投票。我有个假人 ConsumerInterceptor
实施
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
System.out.println("onCommit() invoked with records - " + map.size());
Set<Map.Entry<TopicPartition, OffsetAndMetadata>> committedEntries = map.entrySet();
for (Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committedEntries) {
System.out.println("committedRecordTopic " + committedEntry.getKey().topic());
System.out.println("committedRecordPartition " + committedEntry.getKey().partition());
System.out.println("committedRecordOffset " + committedEntry.getValue().offset());
}
}
我给Kafka发唱片的时候 onCommit
方法被反复调用,我一次又一次地看到这种输出(根据上面的拦截器)
onCommit() invoked with records - 1
committedRecordTopic foo
committedRecordPartition 0
committedRecordOffset 12
为什么会这样?如果最后一张唱片被提交了,为什么 onCommit()
用同一条记录调用?当我切换到手动提交时不会发生这种情况 enable.auto.commit
= false
打电话给我 commitSync()
在消费逻辑中
暂无答案!
目前还没有任何答案,快来回答吧!