在消费组再平衡回调函数中,当我想在kafka之外存储和获取偏移量时,如何获取组id

egdjgwm8  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(142)
consumer = Consumer({'bootstrap.servers': bootstrap_server_host,
                               'group.id': group_id,
                               'enable.auto.commit': auto_commit})

consumer.subscribe([topic], on_assign=on_assign_callback, on_revoke=on_revoke_callback);

def on_assign_callback(consumer, partitions):
    get consumer offset use primary key (consumer_group_id, topic, partition_id)

def on_revoke_callback(consumer, partitions):
   store consumer offset use primary key (consumer_group_id, topic, partition_id)

但我无法在这些回调函数中获取消费者组id

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题