Spring Kafka消费错误处理后的佣金补偿

fkaflof6  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我不完全了解消费者错误处理如何与提交偏移量和akcmode一起工作,以及它如何受到错误时停止容器的影响(使用springkafka1.3.*)。
假设我有两个使用者(使用两个分区),它们在轮询时都从各自的分区获取5个事件( max.records.per.poll=5 ).
第一个使用者-第一个事件处理正常,第二个事件处理失败-因此在错误处理程序中我调用 kafkaListenerEndpointRegistry.stop() ,但由于stop的实现只是阻止消费者进行轮询,所以两个消费者仍然完成了当前批处理。因此,第一个使用者处理事件3、4、5(所有这些都处理得没有错误),假设第二个使用者在第四个事件中失败(事件1、2、3、5处理得很好)。我的问题是,将为每个消费者承诺哪些补偿?
我的理解是:
当我使用 AckMode.RECORD/BATCH 与…结合 ackOnError -将为两个消费者承诺最新补偿(5)
当我使用 AckMode.RECORD/BATCH 与…结合 !ackOnError -另外,最新的偏移量将提交给两个使用者-因为尽管在批处理过程中某些事件失败,但批处理中最新处理的事件正常,因此最新处理的事件偏移量获胜。
我的理解正确吗?

a14dhokn

a14dhokn1#

你的理解是正确的;当您停止容器时,您还应该向侦听器发出信号,表示它也需要拒绝任何剩余的记录,以便它们的偏移量不会被提交。
我们正在考虑增加一个 stopNow() 方法,这将阻止向侦听器发送其他记录。
在2.0中,我们添加了 RemainingRecordsErrorHandler (以及实施方案, SeekToCurrentErrorHandler ). 当容器检测到这样的错误处理程序时,它会将剩余的记录呈现给错误处理程序,而不是侦听器。
这个 SeekToCurrentErrorHandler 将所有主题/分区查找到未处理的偏移量(包括失败的记录),以便在下一次轮询时检索它们。
自定义实现可能会查找剩余的记录,但会将失败的记录发送到死信主题(或以其他方式处理它)。
也就是说, stopNow() 对大多数人来说可能更容易处理,但它可能只是2.2的特性;1.3.x用户在失败后需要丢弃/拒绝未处理的记录。
你也可以用 RetryingMessageListenerAdapter (或启用重试,如果使用 @KafkaListener )它将根据重试配置重试传递,而不涉及kafka。失败的记录可以通过 RecoveryCallback 在重试已用尽并提交其偏移量之后;在这种情况下,容器不需要停止。

相关问题