Confluent-Kafka Python -描述消费者组(以获得每个消费者组的滞后)

oyxsuwqo  于 2022-11-28  发布在  Apache
关注(0)|答案(1)|浏览(250)

我想用confluent-kafka得到消费者组的详细信息。

./kafka-consumer-groups.sh --bootstrap-server XXXXXXXXX:9092 --describe --group my-group

我的最终目标是从输出中得到延迟的值。在confluent-kafka python API中有没有什么方法可以得到这些细节。在java API中有一个方法,但是我在python API中找不到。
我尝试在adminClient API中使用describe_configs方法,但它最终抛出了kafkaException,并显示了以下详细信息
发生这种情况的原因很可能是客户端库的请求格式不正确,或者消息发送到了不兼容的代理。有关详细信息,请参阅代理日志。

wmomyfyw

wmomyfyw1#

现在我提出了下面的解决方案。这是一个工作,以获得一个消费者群体的综合滞后

def get_lag(topic,numPartitions):
    diff = list()
    for i in range(numPartitions):
        topic_partition = TopicPartition(topic, partition=i)
        low, high = consumer.get_watermark_offsets(topic_partition)
        currentList = consumer.committed([topic_partition])
        current = currentList[0].offset
        diff.append(high-current)
    return sum(diff) # Combined Lag

相关问题