使用reactor kafka定期轮询消费者指标

fdx2calv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(289)

我们有一个使用React堆Kafka和 KafkaReceiver 对于消费,我们希望收集并发布基本的消费指标。看来我们可以利用 KafkaReceiver.doOnConsumer() 像这样:

receiver.doOnConsumer(Consumer::metrics)
                .flatMapIterable(Map::entrySet)
                .map(m -> Tuples.of(m.getKey(), m.getValue()))

如果这是最好的方法,我不知道什么是最好的方法来做这个定期。
我注意到还有一个版本的 KafkaReceiver.create() 接受自定义 ConsumerFactory ,也许有某种方法可以使用它在创建时用测微计注册潜在的Kafka消费者?我是新来的Spring Boot和相对新来KafkaReact堆,所以我不完全确定。
以下是我迄今为止的代码片段,以了解更多上下文:

KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
    .receive()
    .groupBy(m -> m.receiverOffset().topicPartition())
    .flatMap(partitionFlux -> partitionFlux.publishOn(this.scheduler)
        .map(r -> processEvent(partitionFlux.key(), r))
        .concatMap(this::commit))
    .doOnCancel(this::close)
    .doOnError(error -> LOG.error("An error was encountered", error))
    .blockLast();

如果拿着 doOnConsumer() 这种方法很有道理我们可以 doOnNext() 但是我们会收集和发布每个事件的指标,这太多了,如果我们可以错开和批处理会更好。
任何建议或提示,谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题