rabbitmq:通道级异常恢复

3b6akqbq  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(187)

我正在处理一个从rabbitmq队列中使用的worker服务,并且我正在尝试找出如何处理通道关闭事件,例如:假设我的消费者在30分钟内没有确认经纪人,经纪人因此关闭了通道。
我知道rabbitmq clinet库(我使用的是C#库)会在连接关闭时自动尝试重新连接,但当连接处于活动状态但通道关闭时,最佳做法是什么?我可以注册“通道关闭”事件的处理程序,但除了记录它之外,我还应该在此处理程序中做什么?我毕竟希望继续从相关队列中使用。
这是我的代码,我试图再次创建通道,但我得到超时异常:

var consumer = ...

    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    channel.BasicQos(0, 100, false);

    channel.ModelShutdown += (sender, args) =>
                    {
                        try
                        {
                            Log.Error($"channel was shut down");
                            channel = _connection.CreateModel();
                            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                            channel.BasicQos(0, 100, false)

                        }
                        catch (Exception exception)
                        {
                            Log.Error(exception);
                        }
zzlelutf

zzlelutf1#

据我所知,这个问题与以下事实有关:事件处理发生在试图访问RabbitMQ服务器的同一个线程中,或者有一个锁阻止了通道的创建。

private async void RecreateChannel(object sender, ShutdownEventArgs e)
    {
        _logger.LogWarning($"Channel was shutdowned, whit reason: {e.ReplyText}, code: {e.ReplyCode}, trying to reconnect");

        _channel.Dispose();
        while (!_channel.IsOpen)
        {
            try
            {
                await Task.Run(() => _channel = InitChannel());
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "Failed to recreate channel, trying again");
            }
        }
        _logger.LogInformation("Channel was recreated successfully");
    }

相关问题