我用的是Spring的 SeekToCurrentErrorHandler
与 DeadLetterPublishingRecoverer
.
每次调用错误处理程序时都会记录一条消息,同时还会向监控系统发送一个事件。
我看到的问题是,对于连续的记录调用错误处理程序的次数太多,导致失败。
例如,对于不可重试的异常,如果生成2个(错误)msg,则会得到以下日志:
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
对于我得到的x consumer error msg,似乎是:x调用错误处理程序,x-1调用错误处理程序,x-2调用,等等。
对于可重试的异常也是如此,对于每个重试我都看到相同的异常。正确调用consumer函数后,只有错误处理程序触发次数过多。
这是我的错误处理配置:
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final Monitor monitor;
CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
super(dlpr, retries);
super.setLogLevel(KafkaException.Level.DEBUG);
this.monitor = monitor;
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
records.forEach(record -> {
logAndReportError(record);
});
}
super.handle(exception, records, consumer, container);
}
}
@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
seekToCurrent.addNotRetryableException(SomeFatalException.class);
return seekToCurrent;
}
我有两个问题:
为什么错误处理程序会触发这么多次?
为什么对不可重试的异常执行seek?
1条答案
按热度按时间axzmvihb1#
你的问题不清楚;请添加您的代码和配置。
我们必须在失败的那次投票后寻找记录,以便在下次投票时重新提交。