RabbitMQ中的队列消息堆叠并等待单个消费者,而其他消费者可用。
我们使用RabbitMQ作为信使服务。我们使用bunny来创建连接,并通过传递消息来设置队列。
在我们使用bunny的rails设置中,我们遇到了一个问题,即我们有一个队列,其中有8个消费者在该队列上侦听消息。当消息进来时,理想情况下,它们应该是围绕消费者的例子:队列中有4条消息,消费者1拾取消息1,消费者1是忙碌,消费者2拾取消息2,消费者2是忙碌,消费者3拾取消息3,依此类推。
然而,我们遇到的问题是,队列中有4条消息,消费者1拾取消息1,消费者1忙碌,消费者2-8可用,但消息2-4堆叠在队列中等待消费者1变得可用并处理消息。
我觉得我已经做了大量的研究,只是不能完全弄清楚如何阻止消息堆叠和等待一个单一的消费者。
有没有人在这方面有经验,或者有任何关于如何解决这个问题的想法?
conn = Bunny.new(bunny[0])
conn.start
ch = conn.create_channel
q = ch.queue("#{record_queue_name}", :durable => true)
q.subscribe(:manual_ack => true, :arguments => {"x-priority" => 10}, :block => true) do |delivery_info, properties, payload|
ch.acknowledge(delivery_info.delivery_tag, false)
我们希望任何消息发送到RabbitMQ时,消费者都能先到先得,而不是在其他人可用时堆叠多个消息等待一个忙碌消费者。
编辑:如何复制:三个消费者同时购买。推送6条消息-消费者1 - 3现在忙碌忙,队列中有3条消息。重新启动2和3,当2和3再次侦听时,3条消息仍在队列中等待消费者1。消费者2和3仍然可用。
重新启动消费者1,现在3个队列的消息首先到达新重新启动的消费者2和3的第一个服务器。
我需要消息先来先服务器,无论消费者重新启动。
1条答案
按热度按时间cs7cruho1#
RabbitMQ正在按预期工作。
由于您的代码没有设置QoS / prefetch,RabbitMQ将所有六个消息发送给您的第一个消费者。由于该消费者需要时间来确认消息(在代码中模拟45秒的睡眠),这六个消费者保持在“Unacked”状态,而另外两个消费者没有任何工作要做。重新启动另外两个消费者没有任何效果,因为所有六条消息都处于“Unacked”状态,等待来自第一个消费者的确认。
当您重新启动第一个消费者时,RabbitMQ会检测到连接丢失,并将六条消息排队到“Ready”状态,并将所有六条消息(很可能)传递给另一个消费者,然后问题重复出现。
请参阅this runnable code sample了解如何设置预取。通过预取
1
,RabbitMQ将最多向消费者发送一条消息,并在向该消费者发送另一条消息之前等待ACK。通过这种方式,消息将在您的消费者中分发。