我们有一个使用React堆Kafka和 KafkaReceiver
对于消费,我们希望收集并发布基本的消费指标。看来我们可以利用 KafkaReceiver.doOnConsumer()
像这样:
receiver.doOnConsumer(Consumer::metrics)
.flatMapIterable(Map::entrySet)
.map(m -> Tuples.of(m.getKey(), m.getValue()))
如果这是最好的方法,我不知道什么是最好的方法来做这个定期。
我注意到还有一个版本的 KafkaReceiver.create()
接受自定义 ConsumerFactory
,也许有某种方法可以使用它在创建时用测微计注册潜在的Kafka消费者?我是新来的Spring Boot和相对新来KafkaReact堆,所以我不完全确定。
以下是我迄今为止的代码片段,以了解更多上下文:
KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux.publishOn(this.scheduler)
.map(r -> processEvent(partitionFlux.key(), r))
.concatMap(this::commit))
.doOnCancel(this::close)
.doOnError(error -> LOG.error("An error was encountered", error))
.blockLast();
如果拿着 doOnConsumer()
这种方法很有道理我们可以 doOnNext()
但是我们会收集和发布每个事件的指标,这太多了,如果我们可以错开和批处理会更好。
任何建议或提示,谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!