如何将pykafka group consumer与gevent一起使用?

yeotifhr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我将pykafka group consumer与gevent一起使用,但是结果有重复的数据。显示我的代码:

import gevent
from pykafka import KafkaClient

topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'

def get_consumer():
    client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
    topic = client.topics[topic_name.encode()]

    consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
                                     consumer_group=group.encode(),
                                     auto_commit_enable=True,
                                     )
    return consumer

def worker(worker_id):
    consumer = get_consumer()
    for msg in consumer:
        print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))

if __name__ == '__main__':
    tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
    ret = gevent.joinall(tasks)

鲁尔特:任何人都可以告诉我如何让它工作,皮Kafka不支持gevent吗?

kmb7vmvb

kmb7vmvb1#

我打赌这个问题和你使用gevent没有任何关系。您注意到跨消费者的重复数据的原因是您使用的是 SimpleConsumer 而不是 BalancedConsumer . SimpleConsumer 不执行自动平衡-它只是从其起始偏移量开始消耗整个主题。如果你有很多 SimpleConsumer 示例并排运行就像您在这里所做的那样,每个示例都将从其起始偏移量开始消耗整个主题。 BalancedConsumer ( topic.get_balanced_consumer(consumer_group='mygroup') )可能就是你想要的。它使用消费者再平衡算法来确保在同一组中运行的消费者不会收到相同的消息。要使其正常工作,您的主题至少需要有与使用它的进程相同数量的分区。有关更多信息,请参阅pykafka自述和文档。

相关问题