rabbitmq RetryInterceptorBuilder似乎无法工作

7vux5j2d  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(137)

我正在尝试使用Spring功能来处理重试。我的ListenerContainer是这样定义的:

@Bean("myListenerContainer")
public SimpleMessageListenerContainer statusCheckcontainer(ConnectionFactory connectionFactory, @Autowired StatusQueueListenerServiceImpl messageService) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

        container.setMicrometerEnabled(false);
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(statusCheckQueueName);
    container.setListenerId(statusCheckQueueName);
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setAutoStartup(true);
    container.setPrefetchCount(6);
    container.setConcurrentConsumers(5);
    container.setAdviceChain(RetryInterceptorBuilder.stateful().backOffOptions(statusCheckQueueInitialInterval,
                statusCheckQueueMultiplier, statusCheckQueueMaxInterval).maxAttempts(3).keyGenerator(generator-> 1).build());
                container.setMessageListener(new MessageListenerAdapter( messageService, jsonMessageConverter()));

        return container;
}

我在onMessage方法中强制运行一个RuntimeException,我期望的是只有3次重试尝试,但我看到的是消息被重试,直到TTL(3小时后)过期。
有什么想法吗?提前感谢!

px9o7tmv

px9o7tmv1#

您缺少要为该拦截器设置的MessageRecoverer

/**
 * Callback for message that was consumed but failed all retry attempts.
 *
 * @param message the message to recover
 * @param cause the cause of the error
 */
void recover(Message message, Throwable cause);

https://docs.spring.io/spring-amqp/docs/current/reference/html/#async-listeners

编辑

有状态重试示例

@SpringBootApplication
public class So71710515Application {

    public static void main(String[] args) {
        SpringApplication.run(So71710515Application.class, args);
    }

    @Bean
    MessageConverter converter() {
        SimpleMessageConverter converter = new SimpleMessageConverter();
        converter.setCreateMessageIds(true);
        return converter;
    }

    @Bean
    ApplicationRunner runner(RabbitTemplate template, SimpleRabbitListenerContainerFactory factory) {
        factory.setAdviceChain(RetryInterceptorBuilder.stateful()
                .recoverer((message, cause) -> {
                    System.out.println(new String(message.getBody()) + " retries exhausted");
                })
                .backOffOptions(1000, 2.0, 5000)
                .maxAttempts(3).build());

        return args -> {
            template.convertAndSend("foo", "fail");
            template.convertAndSend("foo", "good");
        };
    }

    @Bean
    Queue queue() {
        return new Queue("foo");
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
        System.out.println(in);
        if (in.equals("fail")) {
            throw new RuntimeException("test");
        }
    }

}

相关问题