我已经使用Spring Cloud Stream和Spring Cloud Function创建了一个Kafka Consumer,用于以批处理模式使用Kafka主题中的消息。现在,我想将错误批处理发送到死信队列,以便进一步调试错误。
我正在用Spring retry处理消费者方法中的重试,但是对于不可重试的异常,我希望将整个批处理发送到DLQ。
这是我的消费者的样子:
@Bean
public Consumer<List<GenericRecord>> consume() {
return (message) -> {
processMessage(message);
}
}
以下是错误处理配置的外观:
@Autowired
private DefaultErrorHandler errorHandler;
ListenerContainerCustomizer<AbstractMessageListenerContainer> c = new ListenerContainerCustomizer<AbstractMessageListenerContainer>() {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
container.setCommonErrorHandler(errorHandler);
}
}
使用DeadRecordPublishinRecoverer启用错误处理程序,以将失败消息发送到DLQ:
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, Details> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(cr, e) -> new TopicPartition("error.topic.name", 0)),
new FixedBackOff(0, 0));
}
但这并没有向error.topic发送任何消息,从错误日志中我可以看到它正在尝试连接到localhost:9092,而不是我在spring.cloud.stream.kafka.binder.brokers
中提到的代理。
如何配置DLQ提供程序从application.properties
读取Kafka元数据?
另外,是否有办法配置Supplier
函数来创建DLQ提供程序?
2条答案
按热度按时间z0qdvdin1#
您可能正在使用 Boot 的自动配置
KafkaTemplate
。使用
spring.kafka.bootstrap-servers
代替-如果没有spring.cloud.stream.kafka.binder.brokers
,则绑定器将使用它;这样,绑定器和模板都将连接到同一代理。您必须抛出一个
BatchListenerFailedException
来指示批处理中的哪个记录失败。yizd12fk2#
DefaultErrorHandler将消息返回到原始主题。您希望覆盖它。