我面对Kafka消费者非常奇怪的问题。我的设置如下。
<prop key="enable.auto.commit">true</prop>
<prop key="auto.commit.interval.ms">10</prop>
<prop key="auto.offset.reset">latest</prop>
我的组中有2个消费者对单个主题进行投票。我不做任何手动抵消管理在消费端。
现在,当我的消费者情绪低落,我的主题上几乎没有新消息发布时,通常我会在重新启动消费者时使用这些消息。
但有一次我注意到,在重启我的消费者之后,我无法从topic中使用这些新消息(在消费者停机且未轮询时发布)。当我再发布几条消息时,它开始从新的消息偏移量读取,我丢失了以前的消息,这些消息是在我的消费者停机时发布的。
请让我知道这种行为背后的可能原因。
2条答案
按热度按时间eufgjt7s1#
我终于找到了根本原因。默认的offsets.retention.minutes设置为1440分钟。如果我的使用者在重新启动后联机,并尝试在代理上查找最后一个提交的偏移量,那么它无法找到它,因为上面的默认过期,然后它开始从新到达的消息偏移量中提取。把这个保留期增加到4天对我有帮助。
更多详情请点击这里-https://cwiki.apache.org/confluence/display/kafka/kip-186%3a+increase+offsets+retention+default+to+7+days
pobjuy322#
默认情况下,使用者获取在建立连接后发布到kafka主题的消息。请尝试不带--from beging参数的生产者和kafka控制台使用者