consumer = Consumer({'bootstrap.servers': bootstrap_server_host,
'group.id': group_id,
'enable.auto.commit': auto_commit})
consumer.subscribe([topic], on_assign=on_assign_callback, on_revoke=on_revoke_callback);
def on_assign_callback(consumer, partitions):
get consumer offset use primary key (consumer_group_id, topic, partition_id)
def on_revoke_callback(consumer, partitions):
store consumer offset use primary key (consumer_group_id, topic, partition_id)
但我无法在这些回调函数中获取消费者组id
暂无答案!
目前还没有任何答案,快来回答吧!