RabbitMQ客户端如何判断它何时失去与服务器的连接?

slhcrj9b  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(187)

如果我连接到RabbitMQ并使用EventingBasicConsumer侦听事件,我如何判断我是否与服务器断开连接?
我知道有一个故障事件,但如果我拔掉网线以模拟故障,它不会触发。
我还尝试了ModelException事件,以及模型上的CallbackException,但似乎都不起作用。
编辑-我标记为答案的那个是正确的,但它对我来说只是解决方案的一部分。RabbitMQ中还内置了HeartBeat功能。服务器在配置文件中指定它。它默认为10分钟,当然你可以改变它。
客户端还可以通过在ConnectionFactory示例上设置RightedHeartbeat值来请求不同的心跳间隔。

wribegjk

wribegjk1#

我猜你正在使用C#库?(但即使如此,我认为其他人也有类似的事件)。
您可以执行以下操作:

public class MyRabbitConsumer
{
  private IConnection connection;

  public void Connect()
  {
    connection = CreateAndOpenConnection();
    connection.ConnectionShutdown += connection_ConnectionShutdown;
  }

  public IConnection CreateAndOpenConnection() { ... }

  private void connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
  {

  }
}
deyfvvtc

deyfvvtc2#

这是一个例子,但标记的答案是什么导致我这一点。

var factory = new ConnectionFactory
{
    HostName = "MY_HOST_NAME",
    UserName = "USERNAME",
    Password = "PASSWORD",
    RequestedHeartbeat = 30
};

using (var connection = factory.CreateConnection())
{
    connection.ConnectionShutdown += (o, e) =>
    {                       
        //handle disconnect                            
    };

    using (var model = connection.CreateModel())
    {
        model.ExchangeDeclare(EXCHANGE_NAME, "topic");
        var queueName = model.QueueDeclare();

        model.QueueBind(queueName, EXCHANGE_NAME, "#"); 

        var consumer = new QueueingBasicConsumer(model);
        model.BasicConsume(queueName, true, consumer);

        while (!stop)
        {
            BasicDeliverEventArgs args;                       
            consumer.Queue.Dequeue(5000, out args);

            if (stop) return;

            if (args == null) continue;
            if (args.Body.Length == 0) continue;

            Task.Factory.StartNew(() =>
            {
                //Do work here on different thread then this one
            }, TaskCreationOptions.PreferFairness);
        }
    }
}

有几件事要注意。
我使用#作为主题。它能抓住一切。通常,你会选择一个主题。
我设置了一个名为“stop”的变量来决定进程何时结束。你会注意到这个循环会一直运行直到这个变量为true。
Dequeue等待5秒,然后离开,如果没有新消息,则不获取数据。这是为了确保我们监听停止变量,并在某个时候实际退出。根据您的喜好更改值。
当一条消息进来时,我在一个新线程上生成处理代码。当前线程被保留用于监听rabbitmq消息,如果一个处理程序需要太长的时间来处理,我不希望它减慢其他消息的速度。您可能需要也可能不需要这个,这取决于您的实现。但是在编写处理消息的代码时要小心。如果它需要一分钟的时间来运行,并且您以亚秒的时间获得消息,那么您将耗尽内存或至少遇到严重的性能问题。

piah890a

piah890a3#

我建议你创建一个关机监听器**(conn. addShutdownloads)**。当连接出现任何问题时,您将能够听到原因。看看怎么做。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("user");
factory.setPassword("yourpass");
factory.setPort("5671");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
conn.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause) {
        System.out.println("Connection closed by a reason" + cause.toString())
    }
});

更多信息在这里:https://www.rabbitmq.com/api-guide.html

相关问题