RabbitMQ.NET客户机是否有任何类型的异步支持?我希望能够异步地连接和使用消息,但是到目前为止还没有找到一种方法来做到这两点。
- (对于使用消息,我可以使用EventingBasicConsumer,但这不是完整的解决方案。)*
下面是我目前如何使用RabbitMQ的一个示例(代码摘自我的博客),只是为了给予一些背景:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
Console.ReadLine();
}
}
5条答案
按热度按时间332nm8kg1#
Rabbit支持使用
AsyncEventingBasicConsumer
类调度到异步消息处理程序。它的工作方式与EventingBasicConsumer
类似,但允许您注册一个回调,该回调返回Task
。回调被调度到RabbitMQ客户端,并等待返回的Task
。nzkunb0c2#
此时,RabbitMQ .NET客户端没有内置的async/await支持。RabbitMQ .NET Client repository上有一个打开的票证
e7arh2l63#
总结当前
async
/TPL
支持:AsyncEventingBasicConsumer
,您可以为它注册事件并返回一个Task
。AsyncDefaultBasicConsumer
,您可以覆盖它的虚方法(如HandleBasicDeliver
)并返回Task
。原始PR here(看起来它也是在5.0中引入的?)async
操作,但我没有看到任何具体的链接。i2loujxw4#
有
AsyncEventingBasicConsumer
,它所做的一切就是在接收消息时await
调用你的异步“事件处理程序”。这是这里唯一异步的事情。通常你不会从中得到任何好处。因为你只有一个“handler”,消息仍然是一条一条处理的,是同步处理的!此外,由于等待是在Consumer内部完成的,因此您将失去对异常处理的控制。让我猜猜,您所说的异步消息处理是指某种程度的并行性。
我最终使用的是TPL Dataflow的
ActionBlock
。ActionBlock
运行您配置的任务,管理等待和并行。由于它在任务上操作,而不是在线程上操作,所以只要它们是真正异步的,它就可以用更少的资源进行管理。1.常规的
EventingBasicConsumer
调用actionBlock.Post(something)
。1.对于并行处理,您需要在
ack
之前告诉RMQ向您发送N条消息:model.BasicQos(0, N, true);
MaxDegreeOfParallelism
属性的选项,该属性也需要设置为N。async Task
,这些async Task
接收使用者先前发布的数据。不应引发任务,因为ActionBlock会停止对异常的所有处理。1.请小心传递
CancellationToken
,并正确等待ActionBlock完成所有正在运行的任务:x1米11米1xahy6op9u5#
当你创建一个通道的时候,你可以设置DispatchConsumersAsync为true,这将提供异步消息处理,默认情况下这个值是false;
那么我们应该使用异步事件基本消费者而不是事件基本消费者