kafka错误处理程序调用次数过多

m0rkklqb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(331)

我用的是Spring的 SeekToCurrentErrorHandlerDeadLetterPublishingRecoverer .
每次调用错误处理程序时都会记录一条消息,同时还会向监控系统发送一个事件。
我看到的问题是,对于连续的记录调用错误处理程序的次数太多,导致失败。
例如,对于不可重试的异常,如果生成2个(错误)msg,则会得到以下日志:

|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication

对于我得到的x consumer error msg,似乎是:x调用错误处理程序,x-1调用错误处理程序,x-2调用,等等。
对于可重试的异常也是如此,对于每个重试我都看到相同的异常。正确调用consumer函数后,只有错误处理程序触发次数过多。
这是我的错误处理配置:

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
    private final Monitor monitor;

    CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
        super(dlpr, retries);
        super.setLogLevel(KafkaException.Level.DEBUG);
        this.monitor = monitor;
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        if (!records.isEmpty()) {
            records.forEach(record -> {
                logAndReportError(record);
            });
        }
        super.handle(exception, records, consumer, container);
    }
}

@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
    var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
   seekToCurrent.addNotRetryableException(SomeFatalException.class);
   return seekToCurrent;
}

我有两个问题:
为什么错误处理程序会触发这么多次?
为什么对不可重试的异常执行seek?

axzmvihb

axzmvihb1#

你的问题不清楚;请添加您的代码和配置。
我们必须在失败的那次投票后寻找记录,以便在下次投票时重新提交。

相关问题