我有一个非常基本的Kafka消费者,它需要消费来自32个分区主题的数据,每个分区上都有大量数据。
它设法消耗来自该主题的大部分数据,但是一旦我们到达每个分区的末尾,它就不会完全到达它的末尾,并且总是保持一个小的滞后,而不是到达该分区的最新偏移。
每次我重新启动消费者时,它都会从这些分区中的一些分区消费,从而将延迟减少到0,但不是所有分区。
下面是重现此错误的最小消耗代码:
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"group.id": "group-id",
})
consumer.subscribe(["topic"])
while True:
batch = consumer.consume(timeout=1, num_messages=100)
if batch:
consumer.commit(batch[-1])
1条答案
按热度按时间1mrurvl11#
在尝试将
min.fetch.bytes
显式设置为1
以确保我的代理没有保存数据,并尝试重构原始代码之后,我注意到我只提交了批处理中接收到的最后一条消息:而且由于某种原因,我下意识地假设一批中的所有消息都来自同一个分区,但我错了!确保提交所有分区的偏移量,这些分区至少负责批处理中的一条消息,这解决了我的问题: