Spring -关闭RabbitMQ消费者连接而不关闭Spring应用程序
我们需要关闭rabbitmq连接,在清理其他资源之前等待几分钟,然后关闭spring应用程序。我们有6分钟的缓冲时间来清理其他资源。但是,在这段时间内,rabbitmq连接会自动重新创建。
我们将按如下方式创建连接...
@Configuration
public class RabbitMQSubscribersHolder extends BaseRabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSubscribersHolder.class);
private ConcurrentMap<String, SimpleMessageListenerContainer> containers = new ConcurrentHashMap();
public RabbitMQSubscribersHolder() {
}
public void addSubscriber(String mqAlias, MessageListener receiver) {
ConnectionFactory connectionFactory = this.connectionFactory(mqAlias);
RabbitAdmin admin = null;
try {
admin = new RabbitAdmin(connectionFactory);
AbstractExchange exchange = this.exchange(mqAlias);
admin.declareExchange(exchange);
Queue queue = null;
if (!StringUtils.isEmpty(this.getMqQueueNameForQueueAlias(mqAlias))) {
queue = this.queue(mqAlias);
admin.declareQueue(queue);
} else {
queue = admin.declareQueue();
}
admin.declareBinding(this.binding(mqAlias, queue, exchange));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(new String[]{queue.getName()});
container.setMessageListener(this.listenerAdapter(receiver));
container.setRabbitAdmin(admin);
container.start();
this.containers.put(mqAlias, container);
} catch (Exception var8) {
LOGGER.error(var8.getMessage(), var8);
}
}
MessageListenerAdapter listenerAdapter(MessageListener receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
public SimpleMessageListenerContainer getContainer(String mqAlias) {
return (SimpleMessageListenerContainer)this.containers.get(mqAlias);
}
这是我们的销毁方法
public void destroy() {
Iterator var1 = this.aliasToConnectionFactory.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<String, CachingConnectionFactory> entry = (Map.Entry)var1.next();
try {
((CachingConnectionFactory)entry.getValue()).destroy();
LOGGER.info("RabbitMQ caching connection factory closed for alias: {}", entry.getKey());
} catch (Exception var4) {
LOGGER.error("RabbitMQ caching connection destroy operation failed for alias: {} due to {}", entry.getKey(), StringUtils.getStackTrace(var4));
}
}
}
似乎连接被破坏后会自动重新创建(从Rabbit UI验证)。以下是日志:
2022-07-11 19:42:55.334 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: preMatchFreeze
2022-07-11 19:42:55.395 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-tSAgXuPdMnLd9q79ryBWGg=fury_pre_matchfreeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@205b73d8 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:55.405 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: parentMatchFreezeEds
2022-07-11 19:42:55.447 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: fantasy_matchFreeze
2022-07-11 19:42:55.489 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: parentMatchFreezeCore
2022-07-11 19:42:55.489 INFO 35761 --- [ Thread-11] c.g.f.fury.GracefulShutdownHandler : Waiting before starting graceful shutdown
2022-07-11 19:42:55.794 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-sK8_FY4PtHW52HXkQV3S_w=fury_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@4bc9389 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:56.046 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3828e912 [delegate=amqp://gwportal@10.24.128.76:5672/, localPort= 58170]
2022-07-11 19:42:56.098 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-KyvnWaqZco9NRTessjupJQ=fantasy_parent_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@17d45cfb Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:43:13.127 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@4c3cefad [delegate=amqp://gwportal@10.24.141.77:5672/, localPort= 58182]
2022-07-11 19:43:16.849 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@801eaf8 [delegate=amqp://gwportal@10.24.141.77:5672/, localPort= 58185]
1条答案
按热度按时间jtjikinw1#
您需要停止消息侦听器容器,以防止它们尝试重新连接。