rabbitmq MassTransit中端点间的并发限制(C#)

k3bvogb1  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(164)

Mass Transit 3.5,RabbitMq
我有一个总线连接的服务,监听队列的数量。

  • Q1
  • Q2
  • Q3
  • Q4等

所有队列都接受相同的消息类型,并且都使用竞争的使用者模式,其中不同机器上的多个服务侦听这些相同的队列。
我所追求的是为每个进程在所有这些队列上设置一个并发限制。
我的资源有限(可用许可证数量),需要将并发请求限制在每台机器的可用许可证数量内。
所以机器A可能有4个,机器B可能有10个,等等。
如果我已经有4个消费者处理所有队列中的消息,我不希望机器A消费所有这些队列中的任何消息,如果其他机器(机器B、C等)有可用的资源,它应该让它们消费这些消息。
我的观点是

sbc.UseConcurrencyLimit(4)

内部

var bus = MassTransit.Bus.Factory.CreateUsingRabbitMq

是设置每个队列的并发限制,所以如果我有4个,他们都加起来。
我需要的是所有队列累积工作到该并发限制,但不超过它。
大众运输是否有一个内置的方法来实现这一目标?

xn1cxnb4

xn1cxnb41#

没有内置的方法来限制多个服务之间的并发。这将需要使用某种类型的全局资源管理器,它只是不是MassTransit支持开箱即用的东西。

nvbavucw

nvbavucw2#

我为MassTransit的最新版本提供了这个答案,目前是版本8。在版本8中,您可以在两个不同的上下文中限制并发。
首先,在定义ReceiveEndpoint时,可以为每个特定队列设置最大并发。下面是一个示例:

cfg.ReceiveEndpoint("Q_Import_1", e =>
{
    e.ConfigureConsumer<TrafficImportConsumer>(context);
    e.UseConcurrencyLimit(1);
});

cfg.ReceiveEndpoint("Q_Import_2", e =>
{
    e.ConfigureConsumer<TrafficImportConsumer>(context);
    e.UseConcurrencyLimit(1);
});

其次,在将MassTransit服务添加到应用程序时,您可以在那里设置“并发限制”选项,以定义整个应用程序的最大累积并发。下面是一个如何做到这一点的例子:

builder.Services.AddMassTransit(x => 
x.UsingInMemory((context, cfg) =>
{
    cfg.UseConcurrencyLimit(2);
    cfg.ConfigureEndpoints(context);
}));

在本例中,每个队列的最大并发数为1,但整个应用程序的并发任务总数不会超过两个。

相关问题