Python Kafka消费者组ID问题

8cdiaqws  于 2023-01-29  发布在  Python
关注(0)|答案(4)|浏览(218)

阿法克,
kafka中的分区和(消费者)组的概念是为了实现并行性而引入的,我正在通过python和kafka一起工作,我有某个主题,它有(比方说)2个分区,这意味着,如果我启动一个消费者组,里面有2个消费者,它们将被Map(订阅)到不同的分区。
但是,在python中使用kafka库时,我遇到了一个奇怪的问题,我用基本相同的group-id启动了两个消费者,并为他们启动了消费消息的线程。
但是,kafka-stream中的每一条消息都被它们两个使用!!这对我来说似乎很荒谬,甚至在概念上是不正确的。有没有什么方法可以手动地将使用者Map到某些(不同的)分区(如果它们没有自动Map到不同的分区)?
下面是代码:

from kafka import KafkaConsumer
import thread

def con1(consumer):
    for msg in consumer:
        print msg

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])

thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))

下面是我使用kafka-console-producer生成的一些消息的输出:

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')

而预期的是每一个。顺便说一句,这个主题k-test有2个分区。

fjnneemd

fjnneemd1#

根据我的经验,密钥必须长于4个字符,否则所有内容都将转到分区0

uxhixvfz

uxhixvfz2#

我猜您使用的是Kafka 0.8或更低版本,基于documents,它们不支持此功能:
...但是,某些功能只能在较新的代理上启用;例如,完全协调的消费者组--即对同一组中的多个消费者进行动态分区分配--需要使用0.9+ Kafka代理......

yvt65v4c

yvt65v4c3#

from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "k-test"
PARTITION_0 = 0
PARTITION_1 = 1

consumer_0 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
consumer_1 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
topic_partition_0 = TopicPartition(TOPIC, PARTITION_0)
topic_partition_1 = TopicPartition(TOPIC, PARTITION_1)
# format: topic, partition
consumer_0.assign([topic_partition_0])
consumer_1.assign([topic_partition_1])

assign()可能对您有用,但是一旦您使用它,当有消费者停止工作时,Kafka将不会自动平衡消费者。

1tu0hz3e

1tu0hz3e4#

试着运行bin/kafka-consumer-groups.sh命令行工具来验证你正在使用的Python Kafka客户端是否支持正确的使用者组管理,如果两个使用者确实在同一个组中,那么他们应该从互斥的分区中获得消息。

相关问题