此问题已在此处有答案:
Consume from timestamp using offsets_for_times(2个答案)
33分钟前关闭
def kafkaa(self, auto_offset_reset, timeout=500):
group_name = "group name"
config = {"bootstrap.servers": "server",
"schema.registry.url": "url",
"group.id": group_name,
"enable.auto.commit": False,
"auto.offset.reset": False, // True
"sasl.mechanisms": "sasl",
"security.protocol": "protocol",
"sasl.username": "username",
"sasl.password": "pw"}
consumer = AvroConsumer(config)
data_consumed = []
consumer.subscribe(kafkaTopic)
while True:
if time.time() > time.time() + timeout:
break
else:
message = consumer.poll()
if message is not None:
kafka_ms.append(message)
consumer.commit(asynchronous=False)
consumer.close()
return data_consumed
`
当使用auto.offset.reset = latest时,不使用组ID,这不会返回任何值,因为流并不总是有消息要使用。
当使用auto.offset.reset = latest时,如果组ID是一个现有的组ID,则返回offset之后的所有内容,直到超时,但代理重新启动
1条答案
按热度按时间bis0qfac1#
基于时间戳
您需要使用
offsets_for_time
函数来查找任何主题的偏移量(针对该时间戳),然后在您可以从该时间戳开始轮询之前查找这些分区偏移量的消费者auto.offset.reset和group.id的值无关紧要,除非您希望提交偏移量,但这不是必需的
获取主题的高水印,然后减去1,并查找这些偏移量,以获取启动消费者之前发送的最后一条消息(实际上可能不是最后一条,因为生产者可能每秒发送数千个事件)