kafka循环分区器没有将消息分发到所有分区

v9tzhpje  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(464)

我尝试使用kafka的roundrobinpartitioner类在所有分区上均匀地分布消息。我的Kafka主题配置如下:
名称:multischemakafkatopicodd
分区数:16
复制因子:2
比方说,如果我要生成100条消息,那么每个分区应该有6或7条消息。但是,我得到了类似的东西:

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0

我想可能是我没有生成足够的消息,所以我尝试了1m记录,并将分区数设置为奇数:
主题:multischemakafkatopicodd
分区数:31
复制因子:2
…我得到了这个。这一次,每个分区中的消息数是均匀分布的。

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684

我再次进行了相同的测试,但将分区的数量减少到8,得到的结果是,我们可以清楚地看到,一些分区有接近15k的消息,而另一些分区有大约10k的消息:

multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599

我做错什么了吗?还是应该这样做?为什么信息分布如此不均?
如果有人能帮我,那就太好了。谢谢。

23c0lvtd

23c0lvtd1#

据我所知,分区器工作得很好。但您必须了解生产商为了最大限度地提高性能所做的优化:
生产者不会为每个send调用将每条消息生成到不同的分区,因为这样做太过杀伤力了。 Round-Robin 保证类似的分发,但可以发送批处理。这意味着,它将缓冲一定数量的消息,这些消息将根据 remainder (不是模量!)在 RoundRobinPartitioner 的代码:

int part = Utils.toPositive(nextValue) % availablePartitions.size();
``` `nextValue` 是一个 `AtomicInteger` 对于每个分区/发送调用递增1。因此,余数将始终以1递增(以循环方式,例如使用4个分区: `0-1-2-3-0-1-2-3-...` )同样,假设在此过程中没有分区被声明为不可用。如果真是这样的话,这个周期可能看起来像 `0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...` ###示例
具有4个分区的主题
每个分区的producer partitioner线程缓冲区包含3条消息
(消息编号计数器以0开始- `new AtomicInteger(0)` )

MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...

当生成第9条消息时,第一个分区的缓冲区就完成了(因为它已经保存了3条消息),因此可以发送给kafka。如果在此处停止进程,则4个分区将如下所示:

Partition Offset
0 3
1 0
2 0
3 0

在生成第10条消息时,第二个分区的缓冲区也将准备好从连线中发送出去,主题如下所示:

Partition Offset
0 3
1 3
2 0
3 0

在现实生活中,缓冲区通常保存大量的消息(这也可以是隧道)。例如,假设存储了1000条消息。对于相同的场景,分区如下所示:

Partition Offset
0 1000
1 1000
2 0
3 0

因此增加了分区之间的“视觉”差异。批处理大小/缓冲区大小越大,问题就越严重。
这与制片人的工作性质有关 `partitioner` 线程本身:默认情况下,它不会独立地发送每条消息,而是存储它们,以便在每次代理调用时发送多条消息,从而优化系统性能。
批处理是提高效率的主要驱动因素之一,为了实现批处理,kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批
如果生产者被停止/启动,这种不平衡可能会更加臭名昭著,因为它将重新启动机制,而不管先前选择的分区是什么(因此它可以开始发送到在停止之前刚刚选择的同一个分区,从而从上一次执行开始增加与其他未选择分区的差异)。
在新的执行中,缓冲区将全部为空,因此无论哪个分区接收的最多,进程都将重新启动。
所以,你在这里停止这个过程:

Partition Offset
0 1000
1 1000
2 0
3 0

保存每个主题的消息数计数器的Map将重新启动,因为它不是代理的一部分,而是生产者的partitioner类的一部分。如果生产者没有正确关闭和/或刷新,那些缓存的消息也将丢失。因此,在这个场景中,您得到的是前面逻辑的重复:

MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)

在某个时刻会导致这种情况:

Partition Offset
0 2000
1 2000
2 0
3 0

这是由于发送过程的非连续执行而产生的不平衡,但是对于 `RoundRobinPartitioner` ,其本质是基于一个连续的过程(不间断)。
您可以通过在发送消息时检查每个分区的偏移量来验证此行为:只有当所选分区存储n条消息时,下一个所选分区才会获得其批处理的n条消息。
注:示例中显示的数字表示“完美”场景;在现实生活中,消息也可以被撤销、压缩、失败、刷新,而不管缓冲区大小、分区不可用,。。。导致偏移数字,如您的问题中所示。
最后一个刷新场景示例:

Partition Offset
0 1000
1 1000
2 0
3 0

进程已停止,但生产者已正确关闭并刷新其消息,因此主题如下所示:

Partition Offset
0 1997
1 1996
2 999
3 998

进程将重新启动。刷新第一个分区的缓冲区后,将显示如下:

Partition Offset
0 2997
1 1996
2 999
3 998

从而加剧了人们对该机制“公平”的困惑。但这不是它的错,因为分区器的Map、计数器和缓冲区中没有持久性。如果让进程连续几天不停地执行,您会发现它确实以一种“近乎相等”的方式平衡了消息。

### 圆桌会议参与者的相关方法:

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/remainder calculus in order to select next partition/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic)
{
/Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch
/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}

相关问题