从Kafka那里消费,没有无限循环

ou6hu8tu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我目前正在使用confluent kafka python客户机来使用来自kafka主题的消息,代码在 while True 循环,如文档中的示例所示。不过,我想设置一个cron作业,每天只使用一次主题。我们的想法是,作业将在早上检查主题,在该时间点消耗主题中的所有消息,然后停止。我试着用python实现这个:

msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()

问题是它永远不会消耗任何东西。我猜第一句话在第一次尝试时永远不会得到信息。它只适用于 while True 但我不想让这个代码无限期地运行,直到那个时间点的最后一条消息被使用为止。

avkwfej4

avkwfej41#

您可以检查循环中消费者组的偏移量,然后在“结束”的某个阈值内打破循环
你可能还想和 max.poll.records 消费者配置,使您能够更好地控制返回的记录数

相关问题