我有以下代码来创建pykafka中的消费者平衡:
consumer = topic.get_balanced_consumer(consumer_group='Testing',auto_commit_enable=True,zookeeper_connect='amsmgmt002:2181,nas5:2181,amsdblx006:2181')
如何跨流程管理使用者的状态?谢谢
wf82jlnq1#
从皮Kafka自述:使用某个主题的balancedconsumer示例的数量可以与该主题具有分区的数量相同。如果它们都连接到同一个zookeeper示例,它们将与该示例通信以自动平衡它们之间的分区。balancedconsumer使用的分区分配策略默认为“range”策略。该策略可以通过membership\u protocol关键字参数进行切换,可以是pykafka.membershipprotocol公开的对象,也可以是pykafka.membershipprotocol.groupmembershipprotocol的自定义示例。
1条答案
按热度按时间wf82jlnq1#
从皮Kafka自述:
使用某个主题的balancedconsumer示例的数量可以与该主题具有分区的数量相同。如果它们都连接到同一个zookeeper示例,它们将与该示例通信以自动平衡它们之间的分区。balancedconsumer使用的分区分配策略默认为“range”策略。该策略可以通过membership\u protocol关键字参数进行切换,可以是pykafka.membershipprotocol公开的对象,也可以是pykafka.membershipprotocol.groupmembershipprotocol的自定义示例。