我有一个Spring Boot应用程序,它有一个带有@KafkaListener的简单Consumer。我有阻塞重试逻辑,可以按预期工作,但在反序列化异常的情况下,我想将损坏的消息存储在DLT主题中,以便稍后手动分析和处理。
我使用DefaultErrorHandler,在反序列化异常的情况下,我的代码进入这个块而没有任何重试尝试,这是预期的行为。但是,问题出在我的errorHandler中的consumerRecord参数。我没有在consumerRecord中获得消息,因此消息在DLT中保存为null。我想以某种方式保留损坏的消息并将其发布到DLT。
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
alerter.alert(exception,"Message Consumption Failed : " + consumerRecord.value() + " , sent to dead letter topic .");
publisherService.sendMessageToDlq(DLQ_TOPIC, consumerRecord.value());}, fixedBackOff);
- consumerRecord. value()**为null:(
1条答案
按热度按时间pdkcd3nj1#
假设您正在使用
ErrorHandlingDeserializer
,请参阅框架的DeadLetterPublishingRecoverer
中的逻辑,了解它如何从头部(其中包含原始数据作为属性)获取DeserializationException
。https://github.com/spring-projects/spring-kafka/blob/54601b1e829a3e9697b4933557a9d6bb16a7de7d/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L474-L481