java—如何在使用SpringKafka框架时处理错误/异常?

vltsax25  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(339)

我找不到如何在中为SpringKafka消费者执行自定义错误处理。
我的要求是:
对于任何反序列化错误,只需将错误和消息写入数据库。
执行中的任何错误 @KafkaListener 方法,重试3次,然后将错误和消息写入数据库。
从spring文档中,我发现,对于1,我必须使用 ErrorHandlingDeserializer 然后它将调用@kafkalistener错误处理程序。对于2,框架提供 SeekToCurrentErrorHandler 它处理消息重试。
我不明白除了启用配置的重试之外,在哪里可以添加代码来将异常/消息写入数据库。

raogr8fs

raogr8fs1#

将恢复程序添加到 SeekToCurrentErrorHandler ```
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));

默认情况下,不重试反序列化异常;在调用恢复程序之前,将重试大多数其他操作。

相关问题