我有一个消费者应用程序,它监听一个kafka主题,在异常情况下,它会按预期将记录发送到dlq(死信队列)。
在抛出异常iam设置使用者应用程序中的标头之前,它正在设置,但不会发送到死信主题,死信主题只有内置的标头,如“x-exception-message”、“x-original-partition”等。。而不是我设定的那个。
下面是我的消费者应用程序中的一段代码:
modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();
System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100
throw new RuntimeException(modifiedMessage.toString());
注意:iam在spring.cloud.stream.kafka.binder.header=x-ecode下的my application.yml中设置x-code
在上面的代码中,我可以设置头,并实际验证它是否已设置,但它没有发送到死信主题。。
有效负载发送正确,如何将该头发送到死信主题?我是否需要在application.yml中添加任何属性来启用它?
1条答案
按热度按时间bttbmeg01#
这里所做的只是创建一条新消息并丢弃它(除了将异常消息设置为其字符串实现之外)。
这不会修改原始入站邮件。
您只需添加一个新的输出绑定,然后自己将修改后的消息发送给它。
或者,如果只添加常量,则可以添加
ChannelInterceptor
到绑定的错误通道并在那里修改消息。如果需要将状态传递给拦截器,可以使用自定义异常。然而,最简单的解决方案是自己发布消息,而不是使用binder的dlq机制。您已经有了创建消息的逻辑,因此
myCustomDlqBinding.send(modifiedMessage)
(还添加了标准的dlq头)。