我正在尝试使用带有自动确认的reactor kafka来实现kafka主题分区的并发处理。这里的文档让人觉得这是可能的:
http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-订购
这和我尝试的唯一区别是我使用的是自动确认。
我有以下代码(相关方法是 receiveAuto
):
public class KafkaFluxFactory<K, V> {
private final Map<String, Object> properties;
public KafkaFluxFactory(Map<String, Object> properties) {
this.properties = properties;
}
public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
}
private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
return new TopicPartition(record.topic(), record.partition());
}
}
当我用这个来创建一个来自kafka的消费者记录流时,我使用了一个并行调度程序( Schedulers.newParallel("debug", 10)
),我看到它们最终都在同一个线程上处理。
你觉得我做错什么了吗?
2条答案
按热度按时间unguejic1#
经过一段时间的反复尝试,再加上对我想要完成的事情的重新思考,我意识到我试图在一段代码中解决两个问题。
我需要的两件事是:
Kafka分区的顺序处理
能够并行处理每个分区
在试图用这段代码解决这两个问题时,我限制了下游用户配置并行化级别的能力。因此,我将方法更改为返回groupedfluxes的流量,它为下游用户提供了确定什么是可并行的正确粒度:
在下游,用户可以使用任何他们想要的调度器来并行化每个发出的groupedflux:
这样可以按顺序并与其他groupedflux并行地处理每个topicpartition groupedflux。
w8ntj3qf2#
我猜它至少在你的消费者中是按顺序执行的。要做平行消耗,你应该把通量转换成
ParallelFlux
```public ParallelFlux<ConsumerRecord<K, V>> receiveAuto(Collection topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.parallel().runOn(Schedulers.parallel()));
}
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)
void subscribe(Subscriber<? super T> s)