我正在使用来自某个主题的kafka消息,但问题是每次消费者重新启动时,它都会读取较旧的已处理消息。我用过 auto.offset.reset=earliest . 使用commit async手动设置它能帮助我克服这个问题吗?我看到Kafka已经启用了自动提交 true 默认情况下。
auto.offset.reset=earliest
true
yqyhoc1h1#
我用过 auto.offset.reset=earliest . 使用commit async手动设置它可以帮助我克服这个问题吗?当设置 auto.offset.reset=earliest 设置时,使用者将从可用的最早偏移量而不是从最后一个偏移量读取。所以,当你第一次用一个新的 group.id 把这个设定为 earliest 它将从起始偏移量开始读取。下面是我们如何调试这个问题。。如果你的消费者 group.id 在每次重新启动时都是相同的,您需要检查是否确实发生了提交。如果要手动重写,请交叉检查 enable.auto.commit 至 false 任何地方。接下来,检查自动提交间隔( auto.commit.interval.ms )默认情况下 5 sec 并查看是否已将其更改为更高的值,以及是否正在触发提交之前重新启动进程。你也可以使用 commitAsync() 甚至 commitSync() 手动触发。使用 commitSync() (阻塞调用)用于测试提交时是否有异常。提交过程中可能出现的错误很少(来自文档) CommitFailedException -当您尝试提交到不再分配给此使用者的分区时,例如,由于使用者不再是组的一部分,将引发此异常 RebalanceInProgressException -如果使用者示例正处于重新平衡过程中,因此尚未确定哪些分区将分配给使用者。 TimeoutException -如果指定的超时 default.api.timeout.ms 在成功完成偏移提交之前过期除此之外。。同时检查你是否正在做 seek() 或者 seekToBeginning() 在你的消费代码里。如果你这么做并打电话 poll() 你可能也会收到旧消息。如果您正在使用嵌入式kafka并进行一些测试,那么每次您重新启动测试时,都可能会创建主题和使用者组,从“开始”开始阅读。检查是否是类似情况。如果不仔细检查代码,就很难判断到底是什么错误。这个答案只提供了调试场景的细节。
group.id
earliest
enable.auto.commit
false
auto.commit.interval.ms
5 sec
commitAsync()
commitSync()
CommitFailedException
RebalanceInProgressException
TimeoutException
default.api.timeout.ms
seek()
seekToBeginning()
poll()
1条答案
按热度按时间yqyhoc1h1#
我用过
auto.offset.reset=earliest
. 使用commit async手动设置它可以帮助我克服这个问题吗?当设置
auto.offset.reset=earliest
设置时,使用者将从可用的最早偏移量而不是从最后一个偏移量读取。所以,当你第一次用一个新的group.id
把这个设定为earliest
它将从起始偏移量开始读取。下面是我们如何调试这个问题。。
如果你的消费者
group.id
在每次重新启动时都是相同的,您需要检查是否确实发生了提交。如果要手动重写,请交叉检查
enable.auto.commit
至false
任何地方。接下来,检查自动提交间隔(
auto.commit.interval.ms
)默认情况下5 sec
并查看是否已将其更改为更高的值,以及是否正在触发提交之前重新启动进程。你也可以使用
commitAsync()
甚至commitSync()
手动触发。使用commitSync()
(阻塞调用)用于测试提交时是否有异常。提交过程中可能出现的错误很少(来自文档)CommitFailedException
-当您尝试提交到不再分配给此使用者的分区时,例如,由于使用者不再是组的一部分,将引发此异常RebalanceInProgressException
-如果使用者示例正处于重新平衡过程中,因此尚未确定哪些分区将分配给使用者。TimeoutException
-如果指定的超时default.api.timeout.ms
在成功完成偏移提交之前过期除此之外。。
同时检查你是否正在做
seek()
或者seekToBeginning()
在你的消费代码里。如果你这么做并打电话poll()
你可能也会收到旧消息。如果您正在使用嵌入式kafka并进行一些测试,那么每次您重新启动测试时,都可能会创建主题和使用者组,从“开始”开始阅读。检查是否是类似情况。
如果不仔细检查代码,就很难判断到底是什么错误。这个答案只提供了调试场景的细节。