版本:
Spring Boot:2.4.1
Kafka客户端:2.6.0
由于某些系统限制,kafka producer被配置为分别发送每条消息。具体来说,通过设置 linger.ms = 0
以及 max.batch.size = 0
. 因此,从kafka的客户机Angular 来看,每条消息都是新的批处理。我想在所有可用分区之间平均分配消息,这就是我配置客户端的原因 RoundRobinPartitioner
为了实现它。
然而,在测试配置时,当一个节点关闭并且我有偶数个分区时,我发现消息分布在所有可用分区的一半之间。这种行为的原因是双重调用 partitioner.partition(..)
在 KafkaProducer.doSend(..)
'. 自 RoundRobinPartitioner.partition()
在hood下,递增计数器并返回其除以可用分区数的剩余部分,在一个记录发布中调用它两次会导致跳过第二个分区。
例如,availablepartitions包含6个分区(1-6)。打电话 partition(..)
两次通过 KafkaProducer.doSend(..)
始终跳过1、3、5个分区。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
..
int partition = partition(record, serializedKey, serializedValue, cluster);
..
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
//Since I disabled batches this if is always true
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
虽然它很容易重新实现,但我想知道是否需要这种行为?
如果当前批处理关闭,更新分区的原因是什么?
我相信我不是第一个,社区如何克服这种设计?
1条答案
按热度按时间kuuvgm7e1#
作为解决方法
UniformStickyPartitioner
可以使用。它做的正是所需要的:只在上选择新分区newBatch(..)
多次呼叫partition(..)
产生相同的输出。真正的循环机制的唯一区别是uniformstickypartitioner策略,分区计算为randomint%availablepartitions。但是,分配仍然相当平等。