kafkaproducer.dosend:双重调用partitioner会导致分布不均

eeq64g8w  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(369)

版本:
Spring Boot:2.4.1
Kafka客户端:2.6.0
由于某些系统限制,kafka producer被配置为分别发送每条消息。具体来说,通过设置 linger.ms = 0 以及 max.batch.size = 0 . 因此,从kafka的客户机Angular 来看,每条消息都是新的批处理。我想在所有可用分区之间平均分配消息,这就是我配置客户端的原因 RoundRobinPartitioner 为了实现它。
然而,在测试配置时,当一个节点关闭并且我有偶数个分区时,我发现消息分布在所有可用分区的一半之间。这种行为的原因是双重调用 partitioner.partition(..)KafkaProducer.doSend(..) '. 自 RoundRobinPartitioner.partition() 在hood下,递增计数器并返回其除以可用分区数的剩余部分,在一个记录发布中调用它两次会导致跳过第二个分区。
例如,availablepartitions包含6个分区(1-6)。打电话 partition(..) 两次通过 KafkaProducer.doSend(..) 始终跳过1、3、5个分区。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
 ..
 int partition = partition(record, serializedKey, serializedValue, cluster);
 ..
 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

//Since I disabled batches this if is always true
        if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }

虽然它很容易重新实现,但我想知道是否需要这种行为?
如果当前批处理关闭,更新分区的原因是什么?
我相信我不是第一个,社区如何克服这种设计?

kuuvgm7e

kuuvgm7e1#

作为解决方法 UniformStickyPartitioner 可以使用。它做的正是所需要的:只在上选择新分区 newBatch(..) 多次呼叫 partition(..) 产生相同的输出。真正的循环机制的唯一区别是uniformstickypartitioner策略,分区计算为randomint%availablepartitions。但是,分配仍然相当平等。

相关问题