我有两个应用程序,一个用于发布,另一个用于订阅。
让我们看看用于注册masstransit的发布代码。
context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost:5672");
cfg.Message<ICreateEmployeeEto>(m => m.SetEntityName("CreateEmployee"));
cfg.Message<IUpdateEmployeeEto>(m => m.SetEntityName("UpdateEmployee"));
cfg.Publish<ICreateEmployeeEto>(y =>
{
y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
y.AutoDelete = false;
y.Durable = true;
});
cfg.Publish<IUpdateEmployeeEto>(y =>
{
y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
y.AutoDelete = false;
y.Durable = true;
});
});
});
我们使用多个总线,因此我使用IEmployeeSvcBus
作为这个总线。
public interface IEmployeeSvcBus: IBus
{
}
对于上面的配置,我的意思是说,ICreateEmployeeEto和IUpdateEmployeeEto的消息类型应该分别发布到名为CreateEmployee和UpdateEmployee的交换机。
问题1:存在使用以下项创建的交换:<namespace>:ICreateEmployeeEto
和<namespace>:IUpdateEmployeeEto
。
目前,我在RabbitMQ中手动将它们分别绑定到CreateEmployee
和UpdateEmployee
。
现在让我们来看看消费者应用程序的配置,同时也看看消费者类。
context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
{
x.AddConsumer<CreateEmployeeConsumer>();
x.AddConsumer<UpdateEmployeeConsumer>();
x.UsingRabbitMq((context, busCfg) =>
{
busCfg.Host("rabbitmq://localhost:5672");
busCfg.ReceiveEndpoint("employees.create_dev", ep =>
{
ep.Bind("CreateEmployee");
ep.ConfigureConsumeTopology = false;
ep.Durable = true;
ep.Lazy = true;
ep.Consumer<CreateEmployeeConsumer>();
});
busCfg.ReceiveEndpoint("employees.update_dev", ep =>
{
ep.Bind("UpdateEmployee");
ep.ConfigureConsumeTopology = false;
ep.Durable = true;
ep.Lazy = true;
ep.Consumer<UpdateEmployeeConsumer>();
});
});
});
另请参阅消费者定义:
public class CreateEmployeeConsumer: IConsumer<ICreateEmployeeEto>
{
private readonly IBus _localBus;
private readonly ImySvc _mySvc;
public CreateEmployeeConsumer(
IBus localBus,
ImySvc mySvc)
{
_localBus = localBus;
_mySvc= mySvc;
}
public CreateEmployeeConsumer()
{
//For Bus Registration
}
public async Task Consume(ConsumeContext<ICreateEmployeeEto> context)
{
//Some operations.
}
}
问题2:当我发布CreateEmployeeEto时,它将进入employees.create_dev_skipped队列。
我如何检查可能导致问题的原因?我错过了代码中的一些错误配置?请让我知道。我是masstransit的新手。
问题1:我尝试使用x.Exchange.ExchangeName检查Publish方法中的交换名称,它给出的是“CreateCompanyPlaceholder”。但不管它是什么:ICreateEmployeeEto。
问题2:我试着评论使用的注射剂,删除绑定并尝试。
1条答案
按热度按时间7gyucuyw1#
当MassTransit将消息移动到 _skipped 队列时,这是因为接收端点上未使用该消息类型。请确保您的消息已正确创建,并且具有相同的类型和命名空间:
From the documentation。
MassTransit对消息协定使用完整的类型名称,包括命名空间。在两个单独的项目中创建相同的消息类型时,命名空间必须匹配,否则将不会使用消息。
除此之外,如果希望为某些消息类型使用不同的交换名称来生成和使用消息,则可以使用属性或消息拓扑配置(类似于上面指定的内容)来更改这些消息类型的交换名称。
更改应在生产者和消费者上指定,以确保一致性。此时,MassTransit将做正确的事情并为您绑定适当的交换。