如何在RabbitMQ中使用MassTransit基于头的消息?

wgx48brx  于 2023-06-23  发布在  RabbitMQ
关注(0)|答案(1)|浏览(198)

我试图在RabbitMQ中使用MassTransit创建一个头交换,因此消费者仅根据消息中的头来消费来自特定队列的消息。
我配置了我的生产者:

builder.Services.AddMassTransit(mt =>
{
    mt.SetKebabCaseEndpointNameFormatter();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.Publish<OrderSubmitted>(p =>
        {
            p.ExchangeType = "headers";
        });
    });
});

这是我的消费者配置:

builder.Services.AddMassTransit(mt =>
{
    mt.AddConsumer<OrderPickupConsumer>();
    mt.AddConsumer<OrderDeliveryConsumer>();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ReceiveEndpoint("OrderPickup", re =>
        {
            re.ConfigureConsumer<OrderPickupConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "pickup" },
                    { "x-match", "all" }
                });
            });
        });

        cfg.ReceiveEndpoint("OrderDelivery", re =>
        {
            re.ConfigureConsumer<OrderDeliveryConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "delivery" },
                    { "x-match", "all" }
                });
            });
        });
    });
});

我这样发布消息:

_bus.Publish<OrderSubmitted>(new
            {
                __Header_Transport = "pickup",
                Product = "Pizza"
            });

            _bus.Publish<OrderSubmitted>(new
            {
                __Header_Transport = "delivery",
                Product = "Burgers"
            });

据我所知,上面的设置是正确的,但是当启动接收应用程序时,会抛出一个错误:ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity
我有一个干净的RabbitMQ示例,没有任何以前存在的队列,即使RabbitMQ根本没有启动,我也会收到错误。
我的配置有问题吗?或者这可能是MassTransit中的一个bug?

sqserrrh

sqserrrh1#

经过一天的调查,我发现添加re.ConfigureConsumeTopology = false;时没有显示错误
但是,消息未传递到队列。为了解决这个问题,我更改了SetBindingArgument配置。

cfg.ReceiveEndpoint("OrderPickup", re =>
        {
            re.ConfigureConsumeTopology = false;
            re.ConfigureConsumer<OrderPickupConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("Transport", "pickup");
                x.SetBindingArgument("x-match", "all");
            });
        });

        cfg.ReceiveEndpoint("OrderDelivery", re =>
        {
            re.ConfigureConsumeTopology = false;
            re.ConfigureConsumer<OrderDeliveryConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("Transport", "delivery");
                x.SetBindingArgument("x-match", "all");
            });
        });

现在,根据头过滤器和使用者的拾取,消息被发送到正确的队列。

相关问题