在使用RabbitMQ和Masstransit时,已发布的消息将被跳过

9avjhtql  于 2023-04-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(201)

我有两个应用程序,一个用于发布,另一个用于订阅。
让我们看看用于注册masstransit的发布代码。

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {                
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host("rabbitmq://localhost:5672");

                    cfg.Message<ICreateEmployeeEto>(m => m.SetEntityName("CreateEmployee"));
                    cfg.Message<IUpdateEmployeeEto>(m => m.SetEntityName("UpdateEmployee"));
                    
                    cfg.Publish<ICreateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });

                    cfg.Publish<IUpdateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });                    
                });
            });

我们使用多个总线,因此我使用IEmployeeSvcBus作为这个总线。

public interface IEmployeeSvcBus: IBus
{
}

对于上面的配置,我的意思是说,ICreateEmployeeEto和IUpdateEmployeeEto的消息类型应该分别发布到名为CreateEmployee和UpdateEmployee的交换机。
问题1:存在使用以下项创建的交换:<namespace>:ICreateEmployeeEto<namespace>:IUpdateEmployeeEto
目前,我在RabbitMQ中手动将它们分别绑定到CreateEmployeeUpdateEmployee
现在让我们来看看消费者应用程序的配置,同时也看看消费者类。

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {
                x.AddConsumer<CreateEmployeeConsumer>();
                x.AddConsumer<UpdateEmployeeConsumer>();

                x.UsingRabbitMq((context, busCfg) =>
                {
                    busCfg.Host("rabbitmq://localhost:5672");
                    
                    busCfg.ReceiveEndpoint("employees.create_dev", ep =>
                    {
                        ep.Bind("CreateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<CreateEmployeeConsumer>();
                    });

                    busCfg.ReceiveEndpoint("employees.update_dev", ep =>
                    {
                        ep.Bind("UpdateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<UpdateEmployeeConsumer>();
                    });
                });
            });

另请参阅消费者定义:

public class CreateEmployeeConsumer: IConsumer<ICreateEmployeeEto>
    {
        private readonly IBus _localBus;
        private readonly ImySvc _mySvc;

        public CreateEmployeeConsumer(
            IBus localBus,
            ImySvc mySvc)
        {
            _localBus = localBus;
            _mySvc= mySvc;
        }

        public CreateEmployeeConsumer()
        {
            //For Bus Registration
        }

        public async Task Consume(ConsumeContext<ICreateEmployeeEto> context)
        {
            //Some operations.
        }
    }

问题2:当我发布CreateEmployeeEto时,它将进入employees.create_dev_skipped队列。
我如何检查可能导致问题的原因?我错过了代码中的一些错误配置?请让我知道。我是masstransit的新手。
问题1:我尝试使用x.Exchange.ExchangeName检查Publish方法中的交换名称,它给出的是“CreateCompanyPlaceholder”。但不管它是什么:ICreateEmployeeEto。
问题2:我试着评论使用的注射剂,删除绑定并尝试。

7gyucuyw

7gyucuyw1#

当MassTransit将消息移动到 _skipped 队列时,这是因为接收端点上未使用该消息类型。请确保您的消息已正确创建,并且具有相同的类型和命名空间:
From the documentation
MassTransit对消息协定使用完整的类型名称,包括命名空间。在两个单独的项目中创建相同的消息类型时,命名空间必须匹配,否则将不会使用消息。
除此之外,如果希望为某些消息类型使用不同的交换名称来生成和使用消息,则可以使用属性或消息拓扑配置(类似于上面指定的内容)来更改这些消息类型的交换名称。
更改应在生产者和消费者上指定,以确保一致性。此时,MassTransit将做正确的事情并为您绑定适当的交换。

相关问题