kafka consumerinterceptor oncommit()方法是否连续调用?

bjg7j2ky  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(192)

我用的是 KafkaConsumer 使用以下设置 enable.auto.commit =
true auto.commit.interval.ms = 2000 .. 每5秒进行一次投票。我有个假人 ConsumerInterceptor 实施

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    System.out.println("onCommit() invoked with records - " + map.size());
    Set<Map.Entry<TopicPartition, OffsetAndMetadata>> committedEntries = map.entrySet();

    for (Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committedEntries) {    
        System.out.println("committedRecordTopic " + committedEntry.getKey().topic());
        System.out.println("committedRecordPartition " + committedEntry.getKey().partition());
        System.out.println("committedRecordOffset " + committedEntry.getValue().offset());
    }

}

我给Kafka发唱片的时候 onCommit 方法被反复调用,我一次又一次地看到这种输出(根据上面的拦截器)

onCommit() invoked with records - 1
committedRecordTopic foo
committedRecordPartition 0
committedRecordOffset 12

为什么会这样?如果最后一张唱片被提交了,为什么 onCommit() 用同一条记录调用?当我切换到手动提交时不会发生这种情况 enable.auto.commit = false 打电话给我 commitSync() 在消费逻辑中

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题