rabbitmq 在Spring AMQP中自动否定应答后,将消息重新发布到具有已更新标头的同一队列

00jrzges  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(1)|浏览(190)

我正在尝试配置我的SpringAMQPListenerContainer,以允许某种类型的重试流,该重试流与我正在处理的项目中以前使用的自定义rabbit客户端向后兼容。
协议工作方式如下:
1.在信道上接收消息。
1.如果处理失败,消息将被否定,并将重新发布标志设置为false
1.将具有附加/更新的标头(重试计数器)的消息副本发布到同一队列
标头用于过滤传入的消息,但这在这里并不重要。
我希望这种行为是在选择性加入的基础上发生的,这样在与旧客户机的兼容性不是问题的情况下,可以使用更标准化的Spring重试流,并且侦听器应该能够在不需要手动备份的情况下工作。
我已经实现了一个可行的解决方案,我将在下面回到这个解决方案。我正在努力的地方是,在向容器发出它应该nack当前消息的信号之后发布新消息,因为我真的找不到任何在nack之后或在下一个消息之前的好钩子。
阅读文档时,我感觉我在寻找类似于RepublishMessageRecoverer的行为,它被用作重试拦截器的最后一步。在我的案例中,主要的区别是我需要在失败时立即重新发布,而不是作为最后的恢复步骤。我试图查看RepublishMessageRecoverer的实现,但是许多层的间接使我很难理解重新发布是在哪里触发的,以及在此之前是否有nack。
我的工作实现如下所示。注意,我使用的是AfterThrowsAdvice,但我认为错误处理程序也可以使用几乎相同的逻辑。

/*
MyConfig.class, configuring the container factory
*/
@Configuration
public class MyConfig {
    @Bean
    // NB: bean name is important, overwrites autoconfigured bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            Jackson2JsonMessageConverter messageConverter,
            RabbitTemplate rabbitTemplate
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);

        // AOP
        var a1 = new CustomHeaderInspectionAdvice();

        var a2 = new MyThrowsAdvice(rabbitTemplate);

        Advice[] adviceChain = {a1, a2};

        factory.setAdviceChain(adviceChain);

        return factory;
    }
}

/*
MyThrowsAdvice.class, hooking into the exception flow from the listener
*/
public class MyThrowsAdvice implements ThrowsAdvice {

    private static final Logger logger = LoggerFactory.getLogger(MyThrowsAdvice2.class);
    private final AmqpTemplate amqpTemplate;

    public MyThrowsAdvice2(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    public void afterThrowing(Method method, Object[] args, Object target, ListenerExecutionFailedException ex) {
        var message = message(args);
        var cause = ex.getCause();

        // opt-in to old protocol by throwing an instance of BusinessException in business logic
        if (cause instanceof BusinessException) {
            /* 
            NB: Since we want to trigger execution after the current method fails
            with an exception we need to schedule it in another thread and delay
            execution until the nack has happened. 
            */
            new Thread(() -> {
                try {
                    Thread.sleep(1000L);
                    var messageProperties = message.getMessageProperties();
                    var count = getCount(messageProperties);
                    messageProperties.setHeader("xb-count", count + 1);
                    var routingKey = messageProperties.getReceivedRoutingKey();
                    var exchange = messageProperties.getReceivedExchange();
                    amqpTemplate.send(exchange, routingKey, message);
                    logger.info("Sent!");
                } catch (InterruptedException e) {
                    logger.error("Sleep interrupted", e);
                }
            }).start();
            // NB: Produce the desired nack.
            throw new AmqpRejectAndDontRequeueException("Business logic exception, message will be re-queued with updated headers", cause);
        }
    }

    private static long getCount(MessageProperties messageProperties) {
        try {
            Long c = messageProperties.getHeader("xb-count");
            return c == null ? 0 : c;
        } catch (Exception e) {
            return 0;
        }
    }

    private static Message message(Object[] args) {
        try {
            return (Message) args[1];
        } catch (Exception e) {
            logger.info("Bad cast parse", e);
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }

}

现在,正如您所想象的,我对调度一个具有延迟的新线程的不确定性并不特别满意。
所以我的问题很简单,有没有办法使用ListenerContainer提供的钩子来产生一个确定性的解决方案?

vdzxcuhz

vdzxcuhz1#

您当前的解决方案存在邮件丢失的风险;因为您是在延迟后在不同的线程上发布的。如果服务器在延迟期间崩溃,消息将丢失。
最好立即发布到另一个具有TTL和死信配置的队列,以便将过期消息重新发布回原始队列。
使用RepublishMessageRecoverer并将重试次数设置为maxattempts=1应该可以满足您的需要。

相关问题