我尝试使用kafka的roundrobinpartitioner类在所有分区上均匀地分布消息。我的Kafka主题配置如下:
名称:multischemakafkatopicodd
分区数:16
复制因子:2
比方说,如果我要生成100条消息,那么每个分区应该有6或7条消息。但是,我得到了类似的东西:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
我想可能是我没有生成足够的消息,所以我尝试了1m记录,并将分区数设置为奇数:
主题:multischemakafkatopicodd
分区数:31
复制因子:2
…我得到了这个。这一次,每个分区中的消息数是均匀分布的。
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
我再次进行了相同的测试,但将分区的数量减少到8,得到的结果是,我们可以清楚地看到,一些分区有接近15k的消息,而另一些分区有大约10k的消息:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
我做错什么了吗?还是应该这样做?为什么信息分布如此不均?
如果有人能帮我,那就太好了。谢谢。
1条答案
按热度按时间23c0lvtd1#
据我所知,分区器工作得很好。但您必须了解生产商为了最大限度地提高性能所做的优化:
生产者不会为每个send调用将每条消息生成到不同的分区,因为这样做太过杀伤力了。
Round-Robin
保证类似的分发,但可以发送批处理。这意味着,它将缓冲一定数量的消息,这些消息将根据remainder
(不是模量!)在RoundRobinPartitioner
的代码:MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...
Partition Offset
0 3
1 0
2 0
3 0
Partition Offset
0 3
1 3
2 0
3 0
Partition Offset
0 1000
1 1000
2 0
3 0
Partition Offset
0 1000
1 1000
2 0
3 0
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)
Partition Offset
0 2000
1 2000
2 0
3 0
Partition Offset
0 1000
1 1000
2 0
3 0
Partition Offset
0 1997
1 1996
2 999
3 998
Partition Offset
0 2997
1 1996
2 999
3 998
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/remainder calculus in order to select next partition/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic)
{
/Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}