rabbitmq MassTransit -使用分区程序-多个使用者

sqserrrh  于 2022-12-18  发布在  RabbitMQ
关注(0)|答案(1)|浏览(172)

我们最近遇到了一个问题,用户多次提交一个请求,因为我们在消费者内部使用工作单元模式,这些多个请求将在我们的数据库中创建重复的记录。
我们看到可以添加一个UsePartitioner方法,以便根据我们设置的ID对消息进行分区,这将使使用者等待,直到具有该分区ID的消息完成后才开始下一个分区。在我的Docker设置中,每个服务只运行一个容器,这似乎在本地工作得很好。但是,我注意到,当我们将其部署到其他环境时,我们仍然存在试图生成重复记录的问题。我想不出还有什么原因,除非我们的其他环境运行多个容器/消费者,分区只发生在单个消费者上,而不是共享?或者我们遗漏了其他设置?
我还要补充一点,我们使用的是Kubernetes,在我们的开发环境中,我们有4个Pod在运行,所以所有4个Pod都有这个消费者的示例。

public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
    {
        public TestConsumerDefinition()
        {
            ConcurrentMessageLimit = 20;
        }

        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
            IConsumerConfigurator<TestConsumer> consumerConfigurator)
        {
            var partitioner = consumerConfigurator.CreatePartitioner(ConcurrentMessageLimit.Value);

            consumerConfigurator.Message<TestMessage>(
                x =>
                    x.UsePartitioner(partitioner, m =>
                        $"{m.Message.DrugId}-{m.Message.PatientId}"));
        }
    }

谢谢你。

pu82cl6c

pu82cl6c1#

首先,没有一种工具可以在不同示例上的负载平衡消费者之间进行划分,您可以构建自己的分布式锁,但是......
最好的方法是确保您的消费者逻辑是幂等的。使用upsert,或者在添加数据之前检查数据是否已经存在。或者,为了获得额外的好处,添加适当的数据库级约束以防止重复(使用唯一约束或索引)。
原因是,即使是分区器也不能阻止一秒钟内两个请求有相同的数据,所以幂等运算在处理分布式系统时很重要。

相关问题