我有一个关于Kafka的主题 N
分区。记录键是一个cookie。记录按以下公式分布在分区中 hash(key) % N
.
我想以并行方式处理分区中的记录。假设每个分区都有 M
被指派进行加工的工人。另一个要求是,同一个cookie由单个工作线程处理(以保持事件顺序)。
我不知道是什么原因 hash
用于对记录进行分区的函数。计算 my_hash(key) % M
如果我使用 my_hash == hash
以及 gcd(N, M) > 1
(尤其是在 N = M
).
因为我知道 partitionId
对于所有的记录,我最初的想法是计算 my_hash(key + "." + partitionId) % M
但我想知道这种扣扣是否足够好。有可能 hash(key) == h1(key + "." + (h2(key) % N)) == my_hash(key + "." + partitionId)
.
我想我应该生成一个唯一的散列函数。你知道这种发电机吗?
编辑
例子:
Partition1: (cookie1, ...), (cookie1, ...), (cookie3, ...)
Partition2: (cookie2, ...), (cookie2, ...), (cookie4, ...)
我想要两条线( N = M
)正在处理每个分区。我不知道 hash
所以我会选择 my_hash == hash
.
然后我会得到:
Partition1_Subpartition1: (cookie1, ...), (cookie1, ...), (cookie3, ...)
Partition1_Subpartition2: <always_empty>
Partition2_Subpartition1: <always_empty>
Partition2_Subpartition2: (cookie2, ...), (cookie2, ...), (cookie4, ...)
而不是更好的分割,例如:
Partition1_Subpartition1: (cookie3, ...)
Partition1_Subpartition2: (cookie1, ...), (cookie1, ...)
Partition2_Subpartition1: (cookie4, ...)
Partition2_Subpartition2: (cookie2, ...), (cookie2, ...)
1条答案
按热度按时间kcrjzv8t1#
Kafka消费者需要在自己的独立线程中运行。不可能(或不建议)在多个消费者之间共享一个线程。所以,如果你有
M
线程,这意味着你有M
消费者。现在,我们来谈谈你的要求:我想用m个线程并行地读取这个主题,这样,具有相同cookie的所有记录都由一个线程读取。
这句话本身在我看来有点模糊不清。因为默认的散列函数确保相同的cookie总是会到达相同的分区,所以您的需求无论如何都会得到满足。
我想有2个线程(n=m)处理每个分区。
您的意思是,您希望每个分区由两个使用者线程处理吗?这是不可能的,除非他们在不同的消费群体,我认为这不是你想要的。
现在,您是否正试图基于某个函数(可能是时间戳或其他什么)将特定密钥(cookie)重定向到不同的分区,如果您知道,它可以转到集合中的任何分区(p1、p2、。。。pn)那么您想让一个使用者使用这n个分区吗?那么,如果同一个cookie的所有事件都出现在同一个分区中,那么与这种情况相比,您将获得什么呢?因为最终是同一个Kafka消费线程消费它。同样,我认为如果您的kafka使用者线程将处理作业委托给线程池(您可能正在谈论),那么无论您是从同一分区还是从一组不同的分区使用相同的密钥,线程池大小都将决定您将实现的并行度。