我目前正在使用confluent kafka python客户机来使用来自kafka主题的消息,代码在 while True
循环,如文档中的示例所示。不过,我想设置一个cron作业,每天只使用一次主题。我们的想法是,作业将在早上检查主题,在该时间点消耗主题中的所有消息,然后停止。我试着用python实现这个:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
问题是它永远不会消耗任何东西。我猜第一句话在第一次尝试时永远不会得到信息。它只适用于 while True
但我不想让这个代码无限期地运行,直到那个时间点的最后一条消息被使用为止。
1条答案
按热度按时间avkwfej41#
您可以检查循环中消费者组的偏移量,然后在“结束”的某个阈值内打破循环
你可能还想和
max.poll.records
消费者配置,使您能够更好地控制返回的记录数