Kafka 在反序列化异常的情况下写入死信主题

h5qlskok  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(130)

我有一个Spring Boot应用程序,它有一个带有@KafkaListener的简单Consumer。我有阻塞重试逻辑,可以按预期工作,但在反序列化异常的情况下,我想将损坏的消息存储在DLT主题中,以便稍后手动分析和处理。
我使用DefaultErrorHandler,在反序列化异常的情况下,我的代码进入这个块而没有任何重试尝试,这是预期的行为。但是,问题出在我的errorHandler中的consumerRecord参数。我没有在consumerRecord中获得消息,因此消息在DLT中保存为null。我想以某种方式保留损坏的消息并将其发布到DLT。

DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        alerter.alert(exception,"Message Consumption Failed : " + consumerRecord.value() + " , sent to dead letter topic .");
        publisherService.sendMessageToDlq(DLQ_TOPIC, consumerRecord.value());}, fixedBackOff);
    • consumerRecord. value()**为null:(
pdkcd3nj

pdkcd3nj1#

假设您正在使用ErrorHandlingDeserializer,请参阅框架的DeadLetterPublishingRecoverer中的逻辑,了解它如何从头部(其中包含原始数据作为属性)获取DeserializationException
https://github.com/spring-projects/spring-kafka/blob/54601b1e829a3e9697b4933557a9d6bb16a7de7d/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L474-L481

相关问题