autocommit是如何工作的,在springcloudstream中一次将轮询多少条消息?

6tdlim6h  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(609)

我查看了文档,它只说明了自动提交如何与 poll() 这里,以及如何配置轮询计数,这里。
所以当我使用 Flux ?
下面是我的消费代码。

@Bean
    fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
        return Function { flux ->
            flux.asFlow().flatMapMerge {
                flow {
                    handler.handleInboundRequest(it)
                    emit(it)
                }
            }.asFlux().then()
        }
    }
hc2pp10m

hc2pp10m1#

为了澄清,自动提交机制是由apachekafka的实现决定的。您使用的是springcloudstream-reactive或springcloudstream-non-reactive不会影响自动提交的工作方式。偏移量将在每次轮询中提交,并检查经过的时间是否大于 auto.commit.interval.ms .
如果提交间隔为5秒,轮询在7秒内发生,则提交将仅在7秒后发生。
要检查使用者提交偏移量的频率,请启用跟踪日志: logging.level.org.apache.kafka: trace

相关问题