Kafka主题的最后一条消息没有被消费掉,即使它们应该被消费掉,这留下了持续的消费滞后

wz3gfoph  于 2023-02-21  发布在  Apache
关注(0)|答案(1)|浏览(201)

我有一个非常基本的Kafka消费者,它需要消费来自32个分区主题的数据,每个分区上都有大量数据。
它设法消耗来自该主题的大部分数据,但是一旦我们到达每个分区的末尾,它就不会完全到达它的末尾,并且总是保持一个小的滞后,而不是到达该分区的最新偏移。
每次我重新启动消费者时,它都会从这些分区中的一些分区消费,从而将延迟减少到0,但不是所有分区。
下面是重现此错误的最小消耗代码:

from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "group.id": "group-id",
})

consumer.subscribe(["topic"])

while True:
    batch = consumer.consume(timeout=1, num_messages=100)
    if batch:
        consumer.commit(batch[-1])
1mrurvl1

1mrurvl11#

在尝试将min.fetch.bytes显式设置为1以确保我的代理没有保存数据,并尝试重构原始代码之后,我注意到我只提交了批处理中接收到的最后一条消息:而且由于某种原因,我下意识地假设一批中的所有消息都来自同一个分区,但我错了!
确保提交所有分区的偏移量,这些分区至少负责批处理中的一条消息,这解决了我的问题:

partitions_to_commit = {m.partition(): m for m in batch}
for message in partitions_to_commit.values():
    consumer.commit(message)

相关问题