java—是否有任何方法可以在maxattempts时间内使用acknowledge.nack for spring cloud stream binder kafka重试?

wko9yo5t  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(357)

我正在尝试使用kafka中的消费批处理,我发现文档中说retry不受支持,如下所示。
使用批处理模式时不支持在绑定器中重试,因此maxtempts将被重写为1。您可以配置seektocurrentbatcherrorhandler(使用listenercontainercustomizer)来实现类似的功能,以便在活页夹中重试。您还可以使用手动ackmode并调用ackowledgement.nack(index,sleep)来提交部分批处理的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅springforapachekafka文档
如果我使用acknowledge.nack(index,sleep),当错误发生时它将无限重试。是否有任何方法可以在maxattempts时间内使用acknowledge.nack重试?
代码就像

@StreamListener(Sink.INPUT)
    public void consume(@Payload List<PayLoad> payloads, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

        try {
            consume(payloads);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(0, 50);

        }

    }
j2datikz

j2datikz1#

没有;你必须自己记录重试次数。
但是,从版本2.5开始,您现在可以使用 RecoveringBatchErrorHandler 在这里,抛出一个特定的异常来告诉处理程序哪个记录失败了,处理程序在该异常之前提交记录的偏移量,并对失败的记录应用重试逻辑。
看到了吗https://docs.spring.io/spring-kafka/reference/html/#recovering-批次eh

相关问题