我查看了文档,它只说明了自动提交如何与 poll()
这里,以及如何配置轮询计数,这里。
所以当我使用 Flux
?
下面是我的消费代码。
@Bean
fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
return Function { flux ->
flux.asFlow().flatMapMerge {
flow {
handler.handleInboundRequest(it)
emit(it)
}
}.asFlux().then()
}
}
1条答案
按热度按时间hc2pp10m1#
为了澄清,自动提交机制是由apachekafka的实现决定的。您使用的是springcloudstream-reactive或springcloudstream-non-reactive不会影响自动提交的工作方式。偏移量将在每次轮询中提交,并检查经过的时间是否大于
auto.commit.interval.ms
.如果提交间隔为5秒,轮询在7秒内发生,则提交将仅在7秒后发生。
要检查使用者提交偏移量的频率,请启用跟踪日志:
logging.level.org.apache.kafka: trace