以下是我目前所做的尝试:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
msg
几乎总是以None
结束,我认为问题可能是'my-group'
已经使用了'my.topic'
的所有消息......但是我不关心消息是否已经被使用--我仍然需要最新的消息,具体来说,我需要最新消息的时间戳。
我又试了一点,从这里看,主题中大概有25条信息,但我不知道如何获取它们:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
如果没有任何消息是因为主题从来没有写过任何东西,我该如何确定呢?如果是这样,我该如何确定主题已经存在了多久呢?我希望写一个脚本,自动删除任何主题,没有写过在过去的X天(最初14-可能会调整它随着时间的推移)。
2条答案
按热度按时间iqjalb3h1#
我遇到了同样的问题,没有关于这个的例子。在我的例子中有一个分区,我需要阅读最后一条消息,从该消息中了解一些信息,以设置我拥有的消费者/生产者组件。
逻辑是启动
Consumer
,订阅主题,轮询消息-〉这将触发on_assign
,在那里通过分配回修改的分区来发生倒带。在on_assign
完成后,对msg
的轮询将继续,并从主题读取最后一条消息。现在
msg
里面有最后一条消息。ny6fqffe2#
如果任何人仍然需要一个例子的情况下,多个分区;我是这样做的: