java 重试完成后死信队列未获取消息

oxf4rvwz  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(125)

达到重试限制后,消息未推送到DLQ。此外,我在主队列中的消息也被删除,在重试完成后获得空日志。但正在针对指定的限制进行重试。重试后,什么也没发生。也尝试了从消费者抛出AmqpRejectAndDontRequueueException,但仍然面临同样的问题。DLQ未收到任何消息。重试后消息未路由到DLQ

@Bean
    Queue testMsgDLQ() {
        return QueueBuilder.durable(testDLQ).build();
    }

    @Bean
    DirectExchange testMsgDLX() {
        return new DirectExchange(testDLX);
    }

    @Bean
    Binding testDLBinding(Queue testMsgDLQ,DirectExchange testMsgDLX) {
        return BindingBuilder.bind(testMsgDLQ)
                .to(testMsgDLX).with(testDLQ);
    }
    @Bean
    Queue testMsgQueue() {
        return QueueBuilder.durable(testQueue)
                .withArgument("x-dead-letter-exchange", testDLX)
                .withArgument("x-dead-letter-routing-key",testDLQ).build();
    }

    @Bean
    DirectExchange testMsgExchange() {
        return new DirectExchange(testExchange);
    }

    @Bean
    Binding testMessageQueue(Queue testMsgQueue,DirectExchange testMsgExchange) {
        return BindingBuilder.bind(testMsgQueue)
                .to(testMsgExchange).with(testQueue);
    }

**>Configuration**
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(host);
        cachingConnectionFactory.setPort(port);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setAddresses(address);
        cachingConnectionFactory.setUri(url);
/*        cachingConnectionFactory.setPublisherReturns(true);
        cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);*/
        cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        cachingConnectionFactory.setRequestedHeartBeat(10);

        return cachingConnectionFactory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(concurrentConsumers);
        factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
       factory.setDefaultRequeueRejected(false);
        factory.setAdviceChain(RetryInterceptorBuilder.stateless()
                .recoverer(new RejectAndDontRequeueRecoverer())
                .maxAttempts(5)
                .backOffOptions(2000, 2.0, 10000) // initialInterval, multiplier, maxInterval

                .build());
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory publisherConnectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(publisherConnectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

**>Consumer**
    @RabbitListener(containerFactory="rabbitListenerContainerFactory", queues = "${rabbitmq.email.queue.name}")
    public void receiveMessage(MessageKeyVo messageId) {

        System.out.println("Demo");

        throw new RuntimeException("q");

       }
yrefmtwq

yrefmtwq1#

不清楚您的代码有什么问题,但该功能运行良好:

下面是我用来测试本地运行的RabbitMQ的Sping Boot 代码。
application.properties中,我只有这个:

spring.rabbitmq.listener.simple.retry.enabled=true

在侦听器容器上启用重试,尝试次数为3次,并将RejectAndDontRequeueRecoverer作为默认策略。有关详细信息,请参阅AbstractRabbitListenerContainerFactoryConfigurer
代码如下所示:

@SpringBootApplication
public class So76100648Application {

    public static void main(String[] args) {
        SpringApplication.run(So76100648Application.class, args);
    }

    private final String testDLQ = "testDLQ";

    private final String testDLX = "testDLX";

    private final String testQueue = "testQueue";

    private final String testExchange = "testExchange";

    @Bean
    Queue testMsgDLQ() {
        return QueueBuilder.durable(testDLQ).build();
    }

    @Bean
    DirectExchange testMsgDLX() {
        return new DirectExchange(testDLX);
    }

    @Bean
    Binding testDLBinding(Queue testMsgDLQ, DirectExchange testMsgDLX) {
        return BindingBuilder.bind(testMsgDLQ)
                .to(testMsgDLX).with(testDLQ);
    }

    @Bean
    Queue testMsgQueue() {
        return QueueBuilder.durable(testQueue)
                .withArgument("x-dead-letter-exchange", testDLX)
                .withArgument("x-dead-letter-routing-key", testDLQ).build();
    }

    @Bean
    DirectExchange testMsgExchange() {
        return new DirectExchange(testExchange);
    }

    @Bean
    Binding testMessageQueue(Queue testMsgQueue, DirectExchange testMsgExchange) {
        return BindingBuilder.bind(testMsgQueue)
                .to(testMsgExchange).with(testQueue);
    }

    @RabbitListener(queues = testQueue)
    public void testListener(Object payload) {
        throw new RuntimeException("intentional");
    }

    @Bean
    ApplicationRunner applicationRunner(RabbitTemplate rabbitTemplate) {
        return args -> rabbitTemplate.convertAndSend(testExchange, testQueue, "test data");
    }

}

因此,我有与您的代码中完全相同的AMQP对象,然后我有一个@RabbitListener,它只是在消息接收时抛出一个异常。
在发送广告接收消息后的日志中,我有:

2023-04-25T09:36:55.827-04:00  WARN 12456 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
    ...
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
    ... 21 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.so76100648.So76100648Application.testListener(java.lang.Object)' threw exception

一切都证明我们没事。
请与我们分享一个简单的最小项目复制。

相关问题