Spring云流Kafka Binder多DeadLetterPublishingRecoverer支持

ldfqzlk8  于 2023-06-22  发布在  Spring
关注(0)|答案(1)|浏览(129)

我使用的是spring Cloud stream Kafka绑定器,我们在其中设置了两个消费者绑定,两个消费者都是我们在应用程序中启动的。我们使用消费者批处理模式,批处理大小为100。我们设置了2个不同的DeadLetterPublishingRecoverer Bean,这样我们就可以有两个不同的死信主题,每个消费者都会在错误时发送消息到不同的错误主题。
由于我已经设置了2个不同的DeadLetterPublishingRecoverer bean,在消息消费错误时,我可以看到Spring总是使用一个DeadLetterPublishingRecoverer bean,并且总是将消息发送到相同的错误主题。我想要的是,如果Consumer1正在处理消息,并且发生了错误,那么Spring将向Error topic1发送消息,并且当Consumer2在处理消息时失败时,那么Spring应该向Error topic2发送消息。我试着像下面这样创建我的自定义BatchErrorHandler,但它不起作用。任何建议,我如何可以设置不同的DeadLetterPublishingRecoverer为每个消费者?

@Component
public class CustomBatchErrorRecovererHandler implements BatchErrorHandler {

    private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer1 ;
    private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer2 ;

    public CustomBatchErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer1 , DeadLetterPublishingRecoverer deadLetterPublishingRecoverer2 ) {
        this.deadLetterPublishingRecoverer1 = deadLetterPublishingRecoverer1 ;
        this.deadLetterPublishingRecoverer2 = deadLetterPublishingRecoverer2 ;
    }

    @Override
    public void handle(Exception exception, ConsumerRecords<?, ?> records) {
        records.foreach(record -> {
         String key = (String) record.key();
         if(key.equalsIgnoreCase("Topic1")){
               deadLetterPublishingRecoverer1.accept(record,exception);
           }else if(key.equalsIgnoreCase("Topic2")){
               deadLetterPublishingRecoverer2.accept(record,exception);
           }
        });
    }
}
xlpyo6sf

xlpyo6sf1#

可以为单个DeadLetterPublishingRecoverer提供目标解析器BiFunction<ConsumerRecord, Exception, TopicPartition
参见文档。https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

相关问题