我找不到如何在中为SpringKafka消费者执行自定义错误处理。
我的要求是:
对于任何反序列化错误,只需将错误和消息写入数据库。
执行中的任何错误 @KafkaListener
方法,重试3次,然后将错误和消息写入数据库。
从spring文档中,我发现,对于1,我必须使用 ErrorHandlingDeserializer
然后它将调用@kafkalistener错误处理程序。对于2,框架提供 SeekToCurrentErrorHandler
它处理消息重试。
我不明白除了启用配置的重试之外,在哪里可以添加代码来将异常/消息写入数据库。
1条答案
按热度按时间raogr8fs1#
将恢复程序添加到
SeekToCurrentErrorHandler
```new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));