RabbitMQ -消息传递顺序

sigwle7e  于 2022-12-23  发布在  RabbitMQ
关注(0)|答案(4)|浏览(205)

我需要为我的新项目选择一个新的队列代理。
这一次我需要一个支持pub/sub的可伸缩队列,并且保持消息排序是必须的。
我读了亚历克西斯的评论:他写道:
“事实上,我们认为RabbitMQ提供了比Kafka更强的秩序”
我读了rabbitmq文档中的消息排序部分:
“可以使用AMQP方法将消息返回到队列,AMQP方法具有一个queue参数(基本.恢复,基本.拒绝和基本.否定),或由于通道在保留未确认消息时关闭...在发行版2.7.0和更高版本中,如果队列有多个订户,则单个使用者仍有可能观察到消息顺序不对。这是由于其他订户可能对消息进行重新排队的操作造成的。从队列的Angular 来看,消息总是按照发布顺序保存。”
如果我需要按顺序处理消息,我只能使用rabbitMQ,每个使用者都有一个独占队列?
RabbitMQ仍然被认为是有序消息队列的好解决方案吗?

ugmeyewa

ugmeyewa1#

好吧,让我们仔细看看你上面描述的场景,我认为在你的问题的片段之前粘贴the documentation是很重要的,以提供上下文:
AMQP 0-9-1核心规范第4.7节解释了保证订购的条件:在一个通道中发布的消息,通过一个交换、一个队列和一个输出通道,将按照它们被发送的相同顺序被接收。2 RabbitMQ从2.7.0版本开始提供了更强的保证。
可以使用具有queue参数的AMQP方法将消息返回到队列(basic.recover、basic.reject和basic.nack),或由于通道在保留未确认消息时关闭。对于2.7.0之前的RabbitMQ版本,这些情况都会导致消息在队列的后面重新排队。从RabbitMQ版本2.7.0开始,消息始终按发布顺序保留在队列中,* 即使在存在重新排队或通道关闭的情况下。*(着重部分是附加的)
因此,很明显,RabbitMQ从2.7.0开始,在消息排序方面对原始AMQP规范进行了相当大的改进。

对于多个(并行)使用者,无法保证处理顺序。

第三段(粘贴在问题中)接着给予了免责声明,我将转述如下:“如果队列中有多个处理器,就不能保证消息按顺序处理。”2他们在这里说的是RabbitMQ不能违背数学定律。
假设一家银行有一队顾客,这家银行以按照顾客进入银行的顺序为顾客提供服务而自豪,顾客排着队,由3个出纳员中的下一个为他们服务。
今天早上,碰巧三个出纳员同时有空,接着的三个顾客又来了。突然,三个出纳员中的第一个得了重病,无法完成对排队的第一个顾客的服务。这时,出纳员2已经完成了对顾客2的服务,出纳员3已经开始为顾客3服务了。
现在,两种情况中的一种可能发生。(1)排队的第一个顾客可以回到排队的最前面,或者(2)第一个客户可以抢占第三个客户,导致柜员停止处理第三个客户,开始处理第一个客户。RabbitMQ和我所知道的任何其他消息代理都不支持这种抢占逻辑。第一个客户实际上并没有最先得到帮助--第二个客户得到了帮助,因为他足够幸运地找到了一个优秀、快速的出纳员。保证客户得到帮助的唯一方法是让一个出纳员一次帮助一个客户,这将给银行带来重大的客户服务问题。
我希望这有助于说明您所问的问题。如果您有多个使用者,则不可能确保消息在每种可能的情况下都得到有序处理。如果您有多个队列、多个独占使用者、不同的代理,等等--没有办法事先保证消息会被多个消费者按顺序应答,但是RabbitMQ会尽最大努力。

8zzbczxx

8zzbczxx2#

Kafka保留了消息排序,但只在分区内而不是全局。如果你的数据需要全局排序和分区,这确实会使事情变得困难。然而,如果你只需要确保同一用户的所有相同事件,等等......都在同一个分区结束,以便它们被正确排序,你可以这样做。生产者负责它们写入的分区,因此,如果您能够对数据进行逻辑分区,那么这可能是更可取。

cbjzeqam

cbjzeqam3#

我觉得这个问题有两个不一样的地方,消费秩序和加工秩序。
消息队列可以在一定程度上保证消息将按顺序使用,但是它们不能保证消息的处理顺序。
这里的主要区别在于,消息处理的某些方面无法在消费时确定,例如:

  • 如前所述,使用者可能在处理时失败,这里消息的使用顺序是正确的,但是使用者未能正确处理它,这将使它返回队列。此时,使用顺序是完整的,但处理顺序不是。
  • 如果“处理”是指消息现在已被丢弃并完全完成处理,那么请考虑处理时间不是线性的情况,换句话说,处理一条消息比处理另一条消息花费更长的时间。例如,如果处理消息3比通常花费更长的时间,则消息4和5可能会在消息3之前被使用并完成处理。

因此,即使您设法使消息回到队列的前面(顺便说一下,这违反了使用顺序),您仍然不能保证它们也将按顺序处理。
如果要按顺序处理消息:
1.始终只有一个使用者示例,或者只有一个主使用者和几个备用使用者。
1.或者,不要使用消息队列,而使用同步阻塞方法进行处理,这听起来可能不太好,但在许多情况下和业务需求中,它是完全有效的,有时甚至是关键使命。

osh3o9ms

osh3o9ms4#

有适当的方法来保证RabbitMQ订阅中消息的顺序。
如果您使用多个使用者,他们将使用共享的ExecutorService处理消息。另请参阅ConnectionFactory.setSharedExecutor(...)。您可以设置Executors.newSingleThreadExecutor()
如果将一个Consumer与一个队列一起使用,则可以使用多个 bindingKeys(它们可以有通配符)绑定此队列。消息将按照消息代理接收到消息的顺序放入队列中。
例如,您有一个发布者,它发布的消息的顺序很重要:

try (Connection connection2 = factory.newConnection();
        Channel channel2 = connection.createChannel()) {
    // publish messages alternating to two different topics
    for (int i = 0; i < messageCount; i++) {
        final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
        channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
    }
}

现在,您可能希望按照消息发布的顺序接收来自 * 队列 * 中两个 * 主题 * 的消息:

// declare a queue for the consumer
final String queueName = channel.queueDeclare().getQueue();

// we bind to queue with the two different routingKeys
final String routingEven = "even";
final String routingOdd = "odd";
channel.queueBind(queueName, exchange, routingEven);
channel.queueBind(queueName, exchange, routingOdd);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });

Consumer现在将按照消息发布的顺序接收消息,而不管您使用了不同的 * 主题 *。
RabbitMQ文档中有一些不错的5分钟教程,可能会有所帮助:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

相关问题