我试图在RabbitMQ中使用MassTransit创建一个头交换,因此消费者仅根据消息中的头来消费来自特定队列的消息。
我配置了我的生产者:
builder.Services.AddMassTransit(mt =>
{
mt.SetKebabCaseEndpointNameFormatter();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Publish<OrderSubmitted>(p =>
{
p.ExchangeType = "headers";
});
});
});
这是我的消费者配置:
builder.Services.AddMassTransit(mt =>
{
mt.AddConsumer<OrderPickupConsumer>();
mt.AddConsumer<OrderDeliveryConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "pickup" },
{ "x-match", "all" }
});
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "delivery" },
{ "x-match", "all" }
});
});
});
});
});
我这样发布消息:
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "pickup",
Product = "Pizza"
});
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "delivery",
Product = "Burgers"
});
据我所知,上面的设置是正确的,但是当启动接收应用程序时,会抛出一个错误:ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity
我有一个干净的RabbitMQ示例,没有任何以前存在的队列,即使RabbitMQ根本没有启动,我也会收到错误。
我的配置有问题吗?或者这可能是MassTransit中的一个bug?
1条答案
按热度按时间sqserrrh1#
经过一天的调查,我发现添加
re.ConfigureConsumeTopology = false;
时没有显示错误但是,消息未传递到队列。为了解决这个问题,我更改了
SetBindingArgument
配置。现在,根据头过滤器和使用者的拾取,消息被发送到正确的队列。