如何通过自动确认按主题和分区并发处理reactor kafka流?

zpf6vheq  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(436)

我正在尝试使用带有自动确认的reactor kafka来实现kafka主题分区的并发处理。这里的文档让人觉得这是可能的:
http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-订购
这和我尝试的唯一区别是我使用的是自动确认。
我有以下代码(相关方法是 receiveAuto ):

public class KafkaFluxFactory<K, V> {

    private final Map<String, Object> properties;

    public KafkaFluxFactory(Map<String, Object> properties) {
        this.properties = properties;
    }

    public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
        return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
            .receiveAutoAck()
            .flatMap(flux -> flux.groupBy(this::extractTopicPartition))
            .flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
    }

    private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
        return new TopicPartition(record.topic(), record.partition());
    }
}

当我用这个来创建一个来自kafka的消费者记录流时,我使用了一个并行调度程序( Schedulers.newParallel("debug", 10) ),我看到它们最终都在同一个线程上处理。
你觉得我做错什么了吗?

unguejic

unguejic1#

经过一段时间的反复尝试,再加上对我想要完成的事情的重新思考,我意识到我试图在一段代码中解决两个问题。
我需要的两件事是:
Kafka分区的顺序处理
能够并行处理每个分区
在试图用这段代码解决这两个问题时,我限制了下游用户配置并行化级别的能力。因此,我将方法更改为返回groupedfluxes的流量,它为下游用户提供了确定什么是可并行的正确粒度:

public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAuto(Collection<String> topics) {
    return KafkaReceiver.create(createReceiverOptions(topics))
        .receiveAutoAck()
        .flatMap(flux -> flux.groupBy(this::extractTopicPartition));
}

在下游,用户可以使用任何他们想要的调度器来并行化每个发出的groupedflux:

public <V> void work(Flux<GroupedFlux<TopicPartition, V>> flux) {
    flux.doOnNext(groupPublisher -> groupPublisher
            .publishOn(Schedulers.elastic())
            .subscribe(this::doWork))
        .subscribe();
}

这样可以按顺序并与其他groupedflux并行地处理每个topicpartition groupedflux。

w8ntj3qf

w8ntj3qf2#

我猜它至少在你的消费者中是按顺序执行的。要做平行消耗,你应该把通量转换成 ParallelFlux ```
public ParallelFlux<ConsumerRecord<K, V>> receiveAuto(Collection topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.parallel().runOn(Schedulers.parallel()));
}

在消费函数中输入后,如果要以并行方式消费,应使用以下方法:

void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)

或任何其他重载方法 `Consumer<T super T>` onnext参数。如果你只使用下面的方法,你将按顺序消耗通量

void subscribe(Subscriber<? super T> s)

相关问题