我想在分布式微服务体系结构上实现kafka的消息传递。
我使用的是pykafka,实现了虚拟生产者和(平衡的)消费者。我将所有消费者分配到同一个消费者组。我可以同时使用python和console中的生产者,甚至运行时添加它们。
但是,我对消费者有一个问题。我可以创建多个python使用者,甚至可以在运行时添加它们。但是,当我将console consumer(kafka console consumer)添加到python consumer的组中时,我得到了mutex错误:
提交来自消费者id“b'michals-macbook-pro”的主题“b'michal\u sample\u topic”的偏移量时出错。local:1722eea0-07d3-4be4-9d97-8b7fb15b0b30“”(错误:{'pykafka.exceptions.unknownmemberid':[0,1]})
此外,这两个用户(即使他们属于同一个用户组)都在使用消息(python用户在自己之间平衡消息,而控制台用户在自己之间平衡消息)
现在,我对Kafka还很陌生,但我的第一印象是,Kafka应该对消费者的实现不可知,所以将它们结合起来应该是可能的。这个问题在我的理解中,是皮Kafka还是我对皮Kafka的实现?
制作人:
from pykafka import KafkaClient
from time import sleep
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
with topic.get_sync_producer() as producer:
while True:
producer.produce(
bytes(
input('Send test message:'),
'utf-8'
)
)
消费者:
from pykafka import KafkaClient
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
balanced_consumer = topic.get_balanced_consumer(
consumer_group=b'testing',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
for message in balanced_consumer:
if message is not None:
print(f'{message.offset} {message.value}')
暂无答案!
目前还没有任何答案,快来回答吧!