我使用kafka-python来获取消费者正在使用的最后一个offset:
def get_consumer_group_offset(group):
topic = ''
g = create_admin()
returnlist = []
counter = 0
end_offsets = g.list_consumer_group_offsets(group)
print(end_offsets)
offsets = list(end_offsets.values())
offsets_u = []
for n in offsets:
offsets_u.append(n.offset)
print("offsets_u")
print(offsets_u)
for f in end_offsets:
topic = f.topic
value = []
value.append(f.partition)
value.append(offsets_u[counter])
counter = counter + 1
returnlist.append(value)
print(returnlist)
return topic, returnlist
但是我得到了以下输出:(元素#1表示分区,因为您可以看到某些分区由于某种原因被跳过)
[[1, 301], [7, 297], [10, 257], [12, 776], [14, 569], [16, 256], [20, 5], [22, 262], [28, 247]]
1条答案
按热度按时间m0rkklqb1#
我也有同样的问题。事实证明,这是故意的行为。
引用自函数
list_consumer_group_offsets
的描述:未指定且group_id没有记录偏移量的分区将被忽略。
参见
https://github.com/dpkp/kafka-python/issues/2307