达到重试限制后,消息未推送到DLQ。此外,我在主队列中的消息也被删除,在重试完成后获得空日志。但正在针对指定的限制进行重试。重试后,什么也没发生。也尝试了从消费者抛出AmqpRejectAndDontRequueueException,但仍然面临同样的问题。DLQ未收到任何消息。重试后消息未路由到DLQ
@Bean
Queue testMsgDLQ() {
return QueueBuilder.durable(testDLQ).build();
}
@Bean
DirectExchange testMsgDLX() {
return new DirectExchange(testDLX);
}
@Bean
Binding testDLBinding(Queue testMsgDLQ,DirectExchange testMsgDLX) {
return BindingBuilder.bind(testMsgDLQ)
.to(testMsgDLX).with(testDLQ);
}
@Bean
Queue testMsgQueue() {
return QueueBuilder.durable(testQueue)
.withArgument("x-dead-letter-exchange", testDLX)
.withArgument("x-dead-letter-routing-key",testDLQ).build();
}
@Bean
DirectExchange testMsgExchange() {
return new DirectExchange(testExchange);
}
@Bean
Binding testMessageQueue(Queue testMsgQueue,DirectExchange testMsgExchange) {
return BindingBuilder.bind(testMsgQueue)
.to(testMsgExchange).with(testQueue);
}
**>Configuration**
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setAddresses(address);
cachingConnectionFactory.setUri(url);
/* cachingConnectionFactory.setPublisherReturns(true);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);*/
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
cachingConnectionFactory.setRequestedHeartBeat(10);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
factory.setDefaultRequeueRejected(false);
factory.setAdviceChain(RetryInterceptorBuilder.stateless()
.recoverer(new RejectAndDontRequeueRecoverer())
.maxAttempts(5)
.backOffOptions(2000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build());
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory publisherConnectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(publisherConnectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
**>Consumer**
@RabbitListener(containerFactory="rabbitListenerContainerFactory", queues = "${rabbitmq.email.queue.name}")
public void receiveMessage(MessageKeyVo messageId) {
System.out.println("Demo");
throw new RuntimeException("q");
}
1条答案
按热度按时间yrefmtwq1#
不清楚您的代码有什么问题,但该功能运行良好:
下面是我用来测试本地运行的RabbitMQ的Sping Boot 代码。
在
application.properties
中,我只有这个:在侦听器容器上启用重试,尝试次数为3次,并将
RejectAndDontRequeueRecoverer
作为默认策略。有关详细信息,请参阅AbstractRabbitListenerContainerFactoryConfigurer
。代码如下所示:
因此,我有与您的代码中完全相同的AMQP对象,然后我有一个
@RabbitListener
,它只是在消息接收时抛出一个异常。在发送广告接收消息后的日志中,我有:
一切都证明我们没事。
请与我们分享一个简单的最小项目复制。