kafka流中获得提交的偏移量?

jjhzyzn0  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(273)

背景:
将使用者拦截器设置为streamsconfig将确保在使用/提交消息时调用拦截器。来自的代码段 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync ```
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}

但是 `consumerInterceptor.onCommit()` 从未调用,即使我看到偏移量在源主题中提交。
问题:
我想这是因为我使用的kstreams只启用了一次处理保证。
这就是当时的逻辑 `org.apache.kafka.streams.processor.internals.StreamTask#commit` ```
if (this.eosEnabled) {
            this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
            this.producer.commitTransaction();
            if (startNewTransaction) {
                this.producer.beginTransaction();
            }
        } else {
            this.consumer.commitSync(consumedOffsetsAndMetadata);
        }

如你所见, consumer.commitSync 依次称为 consumerCoordinator.commit 这就叫 interceptor.onCommit ,不会被调用,因为在启用eos的情况下,调用的是事务api。
问:当在启用eos的源主题上提交偏移量时,有没有一种方法可以将回调挂接到kstream?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题