kafka自动提交偏移量

mum43rcc  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(648)

我使用的是SpringKafka1.2.2.release。我有一个kafka监听器作为消费者,它听一个主题并用elastic索引文档。我的自动提交偏移量属性设置为true//default。
我的印象是,如果侦听器中出现异常(elastic已关闭),则不应提交偏移量,并且应为下一次轮询处理相同的消息
但是,这并没有发生,消费者在下一次轮询时提交了偏移量。在阅读了文章和文档之后,我了解到,如果将auto commit设置为true,则下一次轮询将提交所有偏移量
我的疑问是,消费者为什么要调用下一次轮询,以及我如何防止任何偏移量使用auto commit提交到true,或者我是否需要将此属性设置为false并手动提交。

yqlxgs2m

yqlxgs2m1#

我宁愿把它设为假;容器为您管理偏移更可靠。
设置容器的 AckModeRECORD (默认为 BATCH )在侦听器返回后,容器将为您提交偏移量。
同时考虑升级到至少1.3.3(当前版本为2.1.4);由于kip-62,1.3.x引入了一个更简单的线程模型
编辑
使用自动提交,无论成功/失败,都将提交偏移量。容器在失败后不会提交,除非 ackOnError 是true(不使用自动提交的另一个原因)。
但是,这仍然没有帮助,因为代理不会再次发送相同的记录。您必须在服务器上执行查找操作 Consumer 为了这个。
在2.0.1(当前版本是2.1.4)中,我们添加了 SeekToCurrentErrorHandler 这将导致在下次轮询时重新发送失败和未处理的记录。参见参考手册。
你也可以使用 ConsumerAwareListener 执行seek yourself(也在2.0中添加)。
对于旧版本(>=1.1),必须使用 ConsumerSeekAware 更复杂一点。
另一种选择是添加重试,以便根据重试设置重新尝试传递。

iibxawm4

iibxawm42#

显然,如果我们希望spring-kafka通过重试和“不再进行轮询”来自动(至少记录)处理此问题,那么spring-kafka<=1.3.3@kafkalistener会丢失消息,即使您使用ackonerror=false也是如此。默认的行为是只记录日志。
即使使用SpringKafka1.3.3.release(没有maven源代码)和单个分区主题(concurrency(1)、ackonerror(false)、batchlistener(true)以及ackmode(batch)处理任何运行时异常,我们也能够在使用者上重现消息丢失/跳过。我们最终在模板内重试或探索ConsumerSekAware。
@garyrussell,关于“broker不会再次发送相同的记录”或继续返回下一批未提交的消息?这是因为,消费者轮询是基于它希望获得下一批记录的当前偏移量,而不是基于提交的上一个偏移量?基本上,使用者根本不需要提交(假设在每个处理上都有一些运行时异常),而是继续使用主题上的整个消息。只有从上次提交的偏移量(重复)开始重新启动。
升级到2.0+以使用consumerawarelistenererrorhandler似乎至少需要升级到Spring5.x,这是一个主要的升级。

相关问题