rabbitmq 如何在配置连接时将x-max-length和x-overflow添加到MassTransit队列?

2uluyalo  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(218)

我有一个消费者,他需要从现有的RabbitMQ队列中消费消息。它工作正常,当队列配置正常,没有任何设置。

services.AddMassTransit(config =>
{
    config.AddConsumer<OrderConsumer>();
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host("amqp://user:12345@localhost:54425");
        cfg.ReceiveEndpoint("transient-order-queue", c =>
        {
            c.ConfigureConsumer<OrderConsumer>(ctx);
        });
    });
});
services.AddMassTransitHostedService();

为了完成一些工作,我需要用几个特性来配置队列。

Features    
x-max-length:   1000
x-overflow: reject-publish
arguments:  
x-queue-type:   classic
durable:    true

如何配置我的消费者连接到该队列?它给了我这样的错误:
PRECONDITION_FANORTH-vhost“/”中队列“transient-order-queue”的不等效参数"x-max-length“:接收到的none但current是类型为“long”的值“1000”
但是我不明白,如何将这些参数添加到MassTransit配置中。拜托,帮帮我!

ftf50wuq

ftf50wuq1#

只需将它们添加为队列属性:

cfg.ReceiveEndpoint("transient-order-queue", c =>
        {
            c.QueueAttributes["x-max-length"] = 1000;
            c.QueueAttributes["x-overflow"] = "reject-publish";
            c.ConfigureConsumer<OrderConsumer>(ctx);
        });
gt0wga4j

gt0wga4j2#

找到答案了。可以使用以下语法向队列添加属性:

cfg.ReceiveEndpoint("transient-order-queue", c =>
{
    c.ConfigureConsumer<OrderConsumer>(ctx);
    c.SetQueueArgument("x-overflow", "reject-publish");
    c.SetQueueArgument("x-max-length", 1000);
});
7lrncoxx

7lrncoxx3#

我就是用这个方案实现IConfigureReceiveEndpoint接口的

public class ConfigureReceiveEndpointQueueSize : IConfigureReceiveEndpoint
{
    public void Configure(string name, IReceiveEndpointConfigurator configurator)
    {
        if (configurator is IRabbitMqReceiveEndpointConfigurator rabbitMqConfigurator)
        {
            rabbitMqConfigurator.SetQueueArgument("x-max-length", 10000);
            rabbitMqConfigurator.SetQueueArgument("x-overflow", "drop-head");
        }
    }
}

然后将其添加到STARCHING中的DI

services.AddTransient<IConfigureReceiveEndpoint, ConfigureReceiveEndpointQueueSize>();

此队列属性将添加到每个使用者消息队列中

相关问题