如何在Spring上下文之前关闭Rabbitmq连接

hgb9j2n6  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(1)|浏览(189)

Spring -关闭RabbitMQ消费者连接而不关闭Spring应用程序
我们需要关闭rabbitmq连接,在清理其他资源之前等待几分钟,然后关闭spring应用程序。我们有6分钟的缓冲时间来清理其他资源。但是,在这段时间内,rabbitmq连接会自动重新创建。
我们将按如下方式创建连接...

@Configuration
public class RabbitMQSubscribersHolder extends BaseRabbitMQConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSubscribersHolder.class);
    private ConcurrentMap<String, SimpleMessageListenerContainer> containers = new ConcurrentHashMap();

    public RabbitMQSubscribersHolder() {
    }

    public void addSubscriber(String mqAlias, MessageListener receiver) {
        ConnectionFactory connectionFactory = this.connectionFactory(mqAlias);
        RabbitAdmin admin = null;

        try {
            admin = new RabbitAdmin(connectionFactory);
            AbstractExchange exchange = this.exchange(mqAlias);
            admin.declareExchange(exchange);
            Queue queue = null;
            if (!StringUtils.isEmpty(this.getMqQueueNameForQueueAlias(mqAlias))) {
                queue = this.queue(mqAlias);
                admin.declareQueue(queue);
            } else {
                queue = admin.declareQueue();
            }

            admin.declareBinding(this.binding(mqAlias, queue, exchange));
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(new String[]{queue.getName()});
            container.setMessageListener(this.listenerAdapter(receiver));
            container.setRabbitAdmin(admin);
            container.start();
            this.containers.put(mqAlias, container);
        } catch (Exception var8) {
            LOGGER.error(var8.getMessage(), var8);
        }

    }

    MessageListenerAdapter listenerAdapter(MessageListener receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }

    public SimpleMessageListenerContainer getContainer(String mqAlias) {
        return (SimpleMessageListenerContainer)this.containers.get(mqAlias);
    }

这是我们的销毁方法

public void destroy() {
        Iterator var1 = this.aliasToConnectionFactory.entrySet().iterator();

        while(var1.hasNext()) {
            Map.Entry<String, CachingConnectionFactory> entry = (Map.Entry)var1.next();

            try {
                ((CachingConnectionFactory)entry.getValue()).destroy();
                LOGGER.info("RabbitMQ caching connection factory closed for alias: {}", entry.getKey());
            } catch (Exception var4) {
                LOGGER.error("RabbitMQ caching connection destroy operation failed for alias: {} due to {}", entry.getKey(), StringUtils.getStackTrace(var4));
            }
        }

    }

似乎连接被破坏后会自动重新创建(从Rabbit UI验证)。以下是日志:

2022-07-11 19:42:55.334  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: preMatchFreeze
2022-07-11 19:42:55.395  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-tSAgXuPdMnLd9q79ryBWGg=fury_pre_matchfreeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@205b73d8 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:55.405  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: parentMatchFreezeEds
2022-07-11 19:42:55.447  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: fantasy_matchFreeze
2022-07-11 19:42:55.489  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: parentMatchFreezeCore
2022-07-11 19:42:55.489  INFO 35761 --- [      Thread-11] c.g.f.fury.GracefulShutdownHandler       : Waiting before starting graceful shutdown
2022-07-11 19:42:55.794  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-sK8_FY4PtHW52HXkQV3S_w=fury_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@4bc9389 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:56.046  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@3828e912 [delegate=amqp://gwportal@10.24.128.76:5672/, localPort= 58170]
2022-07-11 19:42:56.098  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-KyvnWaqZco9NRTessjupJQ=fantasy_parent_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://gwportal@10.24.128.76:5672/,1), conn: Proxy@17d45cfb Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:43:13.127  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@4c3cefad [delegate=amqp://gwportal@10.24.141.77:5672/, localPort= 58182]
2022-07-11 19:43:16.849  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@801eaf8 [delegate=amqp://gwportal@10.24.141.77:5672/, localPort= 58185]
jtjikinw

jtjikinw1#

您需要停止消息侦听器容器,以防止它们尝试重新连接。

相关问题