pykafka消费者与终端消费者发生冲突

9avjhtql  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(257)

我想在分布式微服务体系结构上实现kafka的消息传递。
我使用的是pykafka,实现了虚拟生产者和(平衡的)消费者。我将所有消费者分配到同一个消费者组。我可以同时使用python和console中的生产者,甚至运行时添加它们。
但是,我对消费者有一个问题。我可以创建多个python使用者,甚至可以在运行时添加它们。但是,当我将console consumer(kafka console consumer)添加到python consumer的组中时,我得到了mutex错误:
提交来自消费者id“b'michals-macbook-pro”的主题“b'michal\u sample\u topic”的偏移量时出错。local:1722eea0-07d3-4be4-9d97-8b7fb15b0b30“”(错误:{'pykafka.exceptions.unknownmemberid':[0,1]})
此外,这两个用户(即使他们属于同一个用户组)都在使用消息(python用户在自己之间平衡消息,而控制台用户在自己之间平衡消息)
现在,我对Kafka还很陌生,但我的第一印象是,Kafka应该对消费者的实现不可知,所以将它们结合起来应该是可能的。这个问题在我的理解中,是皮Kafka还是我对皮Kafka的实现?
制作人:

from pykafka import KafkaClient
from time import sleep

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

with topic.get_sync_producer() as producer:

while True:
    producer.produce(
        bytes(
            input('Send test message:'), 
            'utf-8'
        )
    )

消费者:

from pykafka import KafkaClient

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

balanced_consumer = topic.get_balanced_consumer(
    consumer_group=b'testing',
    auto_commit_enable=True,
    zookeeper_connect='localhost:2181'
)

for message in balanced_consumer:
    if message is not None:
        print(f'{message.offset} {message.value}')

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题