我需要对Kafka进行民意调查并批量处理事件。在React堆Kafka,因为它是一个蒸汽api,我得到的事件流。有没有一种方法可以组合并获得固定的最大事件大小。
这就是我目前正在做的。
final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
.receiveAutoAck();
receive
.concatMap(r -> r)
.doOnEach(listSignal -> log.info("got one message"))
.map(consumerRecords -> consumerRecords.value())
.collectList()
.flatMap(strings -> {
log.info("Read messages of size {}", strings.size());
return processBulkMessage(strings)
.doOnSuccess(aBoolean -> log.info("Processed records"))
.thenReturn(strings);
}).subscribe();
但是代码只是挂在collectlist之后,永远不会转到最后一个flatmap。
提前谢谢。
1条答案
按热度按时间np8igboo1#
你只要用你的平原做一个“压平”
.concatMap(r -> r)
因此,您完全消除了最初由它构建的批处理receiveAutoAck()
. 有一个列表流为您的processBulkMessage()
要处理,请考虑将所有批处理逻辑移到concatMap()
: