我使用的是spring Cloud stream Kafka绑定器,我们在其中设置了两个消费者绑定,两个消费者都是我们在应用程序中启动的。我们使用消费者批处理模式,批处理大小为100。我们设置了2个不同的DeadLetterPublishingRecoverer Bean,这样我们就可以有两个不同的死信主题,每个消费者都会在错误时发送消息到不同的错误主题。
由于我已经设置了2个不同的DeadLetterPublishingRecoverer bean,在消息消费错误时,我可以看到Spring总是使用一个DeadLetterPublishingRecoverer bean,并且总是将消息发送到相同的错误主题。我想要的是,如果Consumer1正在处理消息,并且发生了错误,那么Spring将向Error topic1发送消息,并且当Consumer2在处理消息时失败时,那么Spring应该向Error topic2发送消息。我试着像下面这样创建我的自定义BatchErrorHandler,但它不起作用。任何建议,我如何可以设置不同的DeadLetterPublishingRecoverer为每个消费者?
@Component
public class CustomBatchErrorRecovererHandler implements BatchErrorHandler {
private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer1 ;
private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer2 ;
public CustomBatchErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer1 , DeadLetterPublishingRecoverer deadLetterPublishingRecoverer2 ) {
this.deadLetterPublishingRecoverer1 = deadLetterPublishingRecoverer1 ;
this.deadLetterPublishingRecoverer2 = deadLetterPublishingRecoverer2 ;
}
@Override
public void handle(Exception exception, ConsumerRecords<?, ?> records) {
records.foreach(record -> {
String key = (String) record.key();
if(key.equalsIgnoreCase("Topic1")){
deadLetterPublishingRecoverer1.accept(record,exception);
}else if(key.equalsIgnoreCase("Topic2")){
deadLetterPublishingRecoverer2.accept(record,exception);
}
});
}
}
1条答案
按热度按时间xlpyo6sf1#
可以为单个
DeadLetterPublishingRecoverer
提供目标解析器BiFunction<ConsumerRecord, Exception, TopicPartition
。参见文档。https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters