kafka消费者在重新启动时使用旧消息

mnemlml8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(761)

我正在使用来自某个主题的kafka消息,但问题是每次消费者重新启动时,它都会读取较旧的已处理消息。
我用过 auto.offset.reset=earliest . 使用commit async手动设置它能帮助我克服这个问题吗?
我看到Kafka已经启用了自动提交 true 默认情况下。

yqyhoc1h

yqyhoc1h1#

我用过 auto.offset.reset=earliest . 使用commit async手动设置它可以帮助我克服这个问题吗?
当设置 auto.offset.reset=earliest 设置时,使用者将从可用的最早偏移量而不是从最后一个偏移量读取。所以,当你第一次用一个新的 group.id 把这个设定为 earliest 它将从起始偏移量开始读取。
下面是我们如何调试这个问题。。
如果你的消费者 group.id 在每次重新启动时都是相同的,您需要检查是否确实发生了提交。
如果要手动重写,请交叉检查 enable.auto.commitfalse 任何地方。
接下来,检查自动提交间隔( auto.commit.interval.ms )默认情况下 5 sec 并查看是否已将其更改为更高的值,以及是否正在触发提交之前重新启动进程。
你也可以使用 commitAsync() 甚至 commitSync() 手动触发。使用 commitSync() (阻塞调用)用于测试提交时是否有异常。提交过程中可能出现的错误很少(来自文档) CommitFailedException -当您尝试提交到不再分配给此使用者的分区时,例如,由于使用者不再是组的一部分,将引发此异常 RebalanceInProgressException -如果使用者示例正处于重新平衡过程中,因此尚未确定哪些分区将分配给使用者。 TimeoutException -如果指定的超时 default.api.timeout.ms 在成功完成偏移提交之前过期
除此之外。。
同时检查你是否正在做 seek() 或者 seekToBeginning() 在你的消费代码里。如果你这么做并打电话 poll() 你可能也会收到旧消息。
如果您正在使用嵌入式kafka并进行一些测试,那么每次您重新启动测试时,都可能会创建主题和使用者组,从“开始”开始阅读。检查是否是类似情况。
如果不仔细检查代码,就很难判断到底是什么错误。这个答案只提供了调试场景的细节。

相关问题