我将pykafka group consumer与gevent一起使用,但是结果有重复的数据。显示我的代码:
import gevent
from pykafka import KafkaClient
topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'
def get_consumer():
client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
topic = client.topics[topic_name.encode()]
consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
consumer_group=group.encode(),
auto_commit_enable=True,
)
return consumer
def worker(worker_id):
consumer = get_consumer()
for msg in consumer:
print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))
if __name__ == '__main__':
tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
ret = gevent.joinall(tasks)
鲁尔特:任何人都可以告诉我如何让它工作,皮Kafka不支持gevent吗?
1条答案
按热度按时间kmb7vmvb1#
我打赌这个问题和你使用gevent没有任何关系。您注意到跨消费者的重复数据的原因是您使用的是
SimpleConsumer
而不是BalancedConsumer
.SimpleConsumer
不执行自动平衡-它只是从其起始偏移量开始消耗整个主题。如果你有很多SimpleConsumer
示例并排运行就像您在这里所做的那样,每个示例都将从其起始偏移量开始消耗整个主题。BalancedConsumer
(topic.get_balanced_consumer(consumer_group='mygroup')
)可能就是你想要的。它使用消费者再平衡算法来确保在同一组中运行的消费者不会收到相同的消息。要使其正常工作,您的主题至少需要有与使用它的进程相同数量的分区。有关更多信息,请参阅pykafka自述和文档。