java 如何配置DeadLetterPublisherRecoverer以在Spring云流批处理模式下向DLQ发送错误消息

vbkedwbf  于 2023-03-11  发布在  Java
关注(0)|答案(2)|浏览(177)

我已经使用Spring Cloud Stream和Spring Cloud Function创建了一个Kafka Consumer,用于以批处理模式使用Kafka主题中的消息。现在,我想将错误批处理发送到死信队列,以便进一步调试错误。
我正在用Spring retry处理消费者方法中的重试,但是对于不可重试的异常,我希望将整个批处理发送到DLQ。
这是我的消费者的样子:

@Bean
public Consumer<List<GenericRecord>> consume() {
    return (message) -> {
        processMessage(message);  
    }
}

以下是错误处理配置的外观:

@Autowired
private DefaultErrorHandler errorHandler;

ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
      @Override
      public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
        container.setCommonErrorHandler(errorHandler);
      }
}

使用DeadRecordPublishinRecoverer启用错误处理程序,以将失败消息发送到DLQ:

@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
        (cr, e) -> new TopicPartition("error.topic.name", 0)),
        new FixedBackOff(0, 0));
}

但这并没有向error.topic发送任何消息,从错误日志中我可以看到它正在尝试连接到localhost:9092,而不是我在spring.cloud.stream.kafka.binder.brokers中提到的代理。
如何配置DLQ提供程序从application.properties读取Kafka元数据?
另外,是否有办法配置Supplier函数来创建DLQ提供程序?

z0qdvdin

z0qdvdin1#

您可能正在使用 Boot 的自动配置KafkaTemplate
使用spring.kafka.bootstrap-servers代替-如果没有spring.cloud.stream.kafka.binder.brokers,则绑定器将使用它;这样,绑定器和模板都将连接到同一代理。
您必须抛出一个BatchListenerFailedException来指示批处理中的哪个记录失败。

yizd12fk

yizd12fk2#

DefaultErrorHandler将消息返回到原始主题。您希望覆盖它。

DeadLetterPublishingRecoverer dltRecoverer = new DeadLetterPublishingRecoverer(dltTopicTemplate);

    return new DefaultErrorHandler(){
        @Override
        public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            dltRecoverer.accept(records.get(0), thrownException);
        }
    };

相关问题