RabbitMQ异步支持

thtygnil  于 2022-12-18  发布在  RabbitMQ
关注(0)|答案(5)|浏览(302)

RabbitMQ.NET客户机是否有任何类型的异步支持?我希望能够异步地连接和使用消息,但是到目前为止还没有找到一种方法来做到这两点。

  • (对于使用消息,我可以使用EventingBasicConsumer,但这不是完整的解决方案。)*

下面是我目前如何使用RabbitMQ的一个示例(代码摘自我的博客),只是为了给予一些背景:

var factory = new ConnectionFactory() { HostName = "localhost" };

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
        channel.BasicConsume("testqueue", true, consumer);

        Console.ReadLine();
    }
}
332nm8kg

332nm8kg1#

Rabbit支持使用AsyncEventingBasicConsumer类调度到异步消息处理程序。它的工作方式与EventingBasicConsumer类似,但允许您注册一个回调,该回调返回Task。回调被调度到RabbitMQ客户端,并等待返回的Task

var factory = new ConnectionFactory
{
    HostName = "localhost",
    DispatchConsumersAsync = true
};

using(var connection = cf.CreateConnection())
{
    using(var channel = conn.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new AsyncEventingBasicConsumer(model);

        consumer.Received += async (o, a) =>
        {
            Console.WriteLine("Message Get" + a.DeliveryTag);
            await Task.Yield();
        };
    }

    Console.ReadLine();
}
nzkunb0c

nzkunb0c2#

此时,RabbitMQ .NET客户端没有内置的async/await支持。RabbitMQ .NET Client repository上有一个打开的票证

e7arh2l6

e7arh2l63#

总结当前async/TPL支持:

  • 正如@paul-turner提到的,现在有一个AsyncEventingBasicConsumer,您可以为它注册事件并返回一个Task
  • 还有一个AsyncDefaultBasicConsumer,您可以覆盖它的虚方法(如HandleBasicDeliver)并返回Task。原始PR here(看起来它也是在5.0中引入的?)
  • 根据对上述PR和this issue的最后评论,看起来他们正在开发一个新的、从头开始的.NET客户端,该客户端将更全面地支持async操作,但我没有看到任何具体的链接。
i2loujxw

i2loujxw4#

AsyncEventingBasicConsumer,它所做的一切就是在接收消息时await调用你的异步“事件处理程序”。这是这里唯一异步的事情。通常你不会从中得到任何好处。因为你只有一个“handler”,消息仍然是一条一条处理的,是同步处理的!此外,由于等待是在Consumer内部完成的,因此您将失去对异常处理的控制。
让我猜猜,您所说的异步消息处理是指某种程度的并行性。
我最终使用的是TPL Dataflow的ActionBlockActionBlock运行您配置的任务,管理等待和并行。由于它在任务上操作,而不是在线程上操作,所以只要它们是真正异步的,它就可以用更少的资源进行管理。
1.常规的EventingBasicConsumer调用actionBlock.Post(something)
1.对于并行处理,您需要在ack之前告诉RMQ向您发送N条消息:model.BasicQos(0, N, true);

  1. ActionBlock具有带有MaxDegreeOfParallelism属性的选项,该属性也需要设置为N。
  2. ActionBlock运行async Task,这些async Task接收使用者先前发布的数据。不应引发任务,因为ActionBlock会停止对异常的所有处理。
    1.请小心传递CancellationToken,并正确等待ActionBlock完成所有正在运行的任务:x1米11米1x
ahy6op9u

ahy6op9u5#

当你创建一个通道的时候,你可以设置DispatchConsumersAsync为true,这将提供异步消息处理,默认情况下这个值是false;

ConnectionFactory connection = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName
        };
        connection.DispatchConsumersAsync = true; <==== set this to true
        var channel = connection.CreateConnection();
        return channel;

那么我们应该使用异步事件基本消费者而不是事件基本消费者

相关问题