具有动态并行消费者数的kafka工作队列

j13ufse2  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(610)

我想用Kafka来“分工”。我想将工作示例发布到一个主题,并运行一个相同的使用者云来处理它们。当每个消费者完成其工作时,它将从主题中提取下一个工作。每个作品只能由一个使用者处理一次。加工工作是昂贵的,所以我需要许多消费者在许多机器上运行,以跟上。我希望消费者的数量根据需要增长和收缩(我计划使用kubernetes来实现这一点)。
我发现了一个模式,其中为每个使用者创建了一个唯一的分区。这将“划分工作”,但分区的数量是在创建主题时设置的。此外,必须在命令行上创建主题,例如。

bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 3 --topic divide-topic --create --replication-factor 1

...

for n in range(0,3):
    consumer = KafkaConsumer(
                     bootstrap_servers=['localhost:9092'])
    partition = TopicPartition('divide-topic',n)
    consumer.assign([partition])
    ...

我可以为每个消费者创建一个独特的主题,并编写自己的代码为这些主题分配工作。这看起来很恶心,我仍然需要通过命令行创建主题。
具有动态数量的并行使用者的工作队列是一种常见的体系结构。我不能第一个需要这个。Kafka的正确做法是什么?

7xllpg7q

7xllpg7q1#

你发现的模式是准确的。注意,还可以使用kafka管理api创建主题,并且在创建主题后还可以添加分区(带有一些gotchas)。
在kafka中,划分工作并允许扩展的方法是使用分区。这是因为在使用者组中,每个分区在任何时候都由单个使用者使用。
例如,您可以有一个具有50个分区的主题和一个订阅此主题的使用者组:
当吞吐量较低时,组中只能有几个消费者,他们应该能够处理流量。
当吞吐量增加时,您可以添加使用者(最多为分区数(本例中为50个分区))来完成一些工作。
在这种情况下,50个消费者是扩展的极限。消费者公开了许多指标(比如lag),允许您随时决定是否有足够的指标

iaqfqrcu

iaqfqrcu2#

谢谢你,米克尔给我指出了正确的方向。
https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html

Kafka consumers are typically part of a consumer group. When multiple
consumers are subscribed to a topic and belong to the same consumer group,
each consumer in the group will receive messages from a different subset of
the partitions in the topic.

https://dzone.com/articles/dont-use-apache-kafka-consumer-groups-the-wrong-wa,

Having consumers as part of the same consumer group means providing the
“competing consumers” pattern with whom the messages from topic partitions
are spread across the members of the group. Each consumer receives messages 
from one or more partitions (“automatically” assigned to it) and the same
messages won’t be received by the other consumers (assigned to different 
partitions). In this way, we can scale the number of the consumers up to the
number of the partitions (having one consumer reading only one partition); in
this case, a new consumer joining the group will be in an idle state without 
being assigned to any partition.

用于在3个用户(最多100个)之间划分工作的示例代码:

bin/kafka-topics.sh --partitions 100 --topic divide-topic --create --replication-factor 1 --zookeeper localhost:2181

...

for n in range(0,3):
    consumer = KafkaConsumer(group_id='some-constant-group',
                     bootstrap_servers=['localhost:9092'])
    ...
bvpmtnay

bvpmtnay3#

我想,你走的路是对的-
这里有一些步骤-
创建kafka主题并创建所需的分区。分区数是并行度的单位。换句话说,您需要运行这么多的使用者来处理工作。
如果扩展需求增加,您可以增加分区。但它附带了一些警告,比如重新划分。请阅读Kafka关于新分区添加的文档。
为消费者定义Kafka消费者组。kafka将为消费者组中的可用消费者分配分区,并自动重新平衡。如果消费者被添加/删除,Kafka会自动进行再平衡。
如果消费者被打包为docker容器,那么使用kubernetes有助于管理容器,特别是对于多节点环境。其他工具包括docker swarm、openshift、mesos等。
Kafka提供分区的订购。
查看交付保证-至少一次,根据您的用例准确地检查一次。
或者,您可以使用kafka streams api。kafka streams是一个客户端库,用于处理和分析存储在kafka中的数据。它建立在重要的流处理概念之上,例如正确区分事件时间和处理时间、窗口支持、简单而有效的管理和应用程序状态的实时查询。

相关问题