reactor kafka文档概述了按顺序使用kafka分区的消息的示例代码,但是在示例代码中,处理方法是同步的。根据一些局部测试,顺序处理和背压在这个特殊的样品中工作得非常好
public Flux<?> flux() {
Scheduler scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "sample", 60, true);
return KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.doOnCancel(() -> close());
}
public ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> message) {
log.info("Processing record {} from partition {} in thread{}",
message.value().id(), topicPartition, Thread.currentThread().getName());
return message.receiverOffset();
}
我们的用例将处理逻辑作为一个异步函数,例如下面所示的例子,所以我们的例子中的processrecord返回一个mono。processrecord方法大约需要3-4秒才能完成,在这种情况下,流不接受背压。越来越多的消息被拉取,而不使用先前的消息。这会导致系统逐渐变得不稳定,最后出现outofmemory异常而失败。订货是尊重,但背压不是。
public Mono<ReceiverOffset> processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> message) {
log.info("Processing record {} from partition {} in thread{}",
message.value().id(), topicPartition, Thread.currentThread().getName());
....
}
我们是否有一个使用reactor从kafka异步消费消息的示例
暂无答案!
目前还没有任何答案,快来回答吧!