Spring Boot版本:2.7.6 Spring Kafka版:2.8.11
发布日期:
我试图在代码中处理反序列化问题。为了在代码中处理此类问题,我创建了自己的类,方法是将
DefaultErrorHandler
并重写public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {}
示例代码,如下所示
public class CustomDefaultErrorHandler extends DefaultErrorHandler {
private static Logger log = LoggerFactory.getLogger(CustomDefaultErrorHandler.class);
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
manageException(thrownException, consumer);
}
private void manageException(Exception ex, Consumer<?, ?> consumer) {
log.error("Error polling message: " + ex.getMessage());
if (ex instanceof RecordDeserializationException) {
RecordDeserializationException rde = (RecordDeserializationException) ex;
consumer.seek(rde.topicPartition(), rde.offset() + 1L);
consumer.commitSync();
} else {
log.error("Exception not handled");
}
}
}
如果我将@RetryableTopic与@KafkaListener一起使用
@RetryableTopic(listenerContainerFactory = "kafkaListenerContainerFactory", backoff = @Backoff(delay = 8000, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR
, traversingCauses = "true", autoCreateTopics = "true", numPartitions = "3", replicationFactor = "3",
fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS, include = {RetriableException.class, RecoverableDataAccessException.class,
SQLTransientException.class, CallNotPermittedException.class}
)
@KafkaListener(topics = "${topic.name}", groupId = "order", containerFactory = "kafkaListenerContainerFactory", id = "OTR")
public void consumeOTRMessages(ConsumerRecord<String, PayloadsVO> payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName) throws JsonProcessingException {
logger.info("Payload :{}", payload.value());
payloadsService.savePayload(payload.value(), pegasusTopicName);
}
我在调试代码时看到,@RetryableTopic在中有自己的DefaultErrorHandler配置
ListenerContainerFactoryConfigurer
并且它停止了我的自定义处理程序,反序列化过程不会停止。
既然我想在代码中使用注解来进行重试,您能给我一些建议吗
我尝试配置自己的
DefaultErrorHandler
通过将其延伸并配置在
ConcurrentKafkaListenerContainerFactory
1条答案
按热度按时间h7appiyu1#
这非常复杂,但是您应该能够覆盖
RetryTopicComponentFactory
bean和listenerContainerFactoryConfigurer()
以返回您的自定义错误处理程序。也就是说,反序列化异常无论如何都会直接进入DLT。
顺便说一句,在这里调用
commitSync()
是没有价值的,因为poll()
没有返回任何记录。