rabbitmq 将具有相同接口的项目分派到不同队列

tct7dpnv  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(176)

我正在使用RabbitMQ和MassTransit库在多服务架构中开发一个服务。
服务通过Consumer接收交易,根据过滤规则(在配置json文件中设置,通过Options导入到服务中),确定交易信息需要发送得地址,并将item发布到单独得队列中,以备将来发送。
在Consumer of Queue for sending中,我只是将数据发送到为该事务指定的地址。
现在需要批量发送数据。这里,带有Batch Consumer的MassTransit功能可以提供帮助。
但是调度也有困难。例如,Consumer接收到4个事务。其中2个需要发送到一个地址,另外2个需要发送到另一个地址。在代码中,我为每个地址创建了两个包含事务的数组,并尝试发送。如果两个数组都成功发送,则一切正常。如果两个数组都收到错误,则整个批处理将重试。这也很好。但是如果其中一个数组发送成功而另一个没有,那么整个批次都会重复。

实际问题是,是否可以为一个实体(使用一个接口)创建两个单独的队列,并根据规则分别向每个队列发送数据?或者是否有其他方法可以解决这个问题,即根据发送地址将事务分成批次?

nhn9ugyo

nhn9ugyo1#

是否可以为一个实体创建两个单独的队列
我想请你试着简化这个过程。如果架构是如此混乱,以至于读者花了30分钟才理解这个问题,那它就太复杂了。考虑一下在12个月的时间里支持这个代码。
但是,可以选择使用发送到批处理的批处理
第一个批处理读取一个自定义消息头(比如__filterby),将消息拆分到两个不同的队列(端点)中。
然后,代码根据逻辑重新批处理到一个专用的端点/消费者。这意味着一个端点/队列。下面是一些伪代码来解释我的意思。

public async Task Consume(ConsumeContext<Batch<OrderAudit>> context)
    {
        var arraya = Contect.Messages(m => m?.Headers?.filterby == 'arraya';
        ConsumeContext<IArrayA> a = arraya;
        // Send

        var arrayb = Contect.Messages(m => m?.Headers?.filterby == 'arrayb';
        ConsumeContext<IArrayB> b = arrayb;
        // send 
    }

而且,这感觉就像是让RabbitMQ Exchange基于Topic/routing_key将流量定向到多个队列。
可能有帮助的参考资料
https://masstransit-project.com/troubleshooting/common-gotchas.html#sharing-a-queue
https://masstransit-project.com/usage/producers.html#message-initializers
https://www.rabbitmq.com/tutorials/tutorial-five-python.html

相关问题