rabbitmq 在python / pika中使用多个队列

7bsow1i6  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(211)

我正在尝试创建一个使用者,该使用者将订阅多个队列,然后在消息到达时对其进行处理。
问题是,当第一个队列中已经存在一些数据时,它会消耗第一个队列,而不会消耗第二个队列。但是,当第一个队列为空时,它会转到下一个队列,然后同时消耗这两个队列。
我第一次实现了线程化,但是我想避开它,当pika库为我做的时候,它没有太多的复杂性。下面是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
jtjikinw

jtjikinw1#

一种可能的解决方案是使用非阻塞连接并使用消息。

import pika

def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)

def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)

def on_channel_open(channel):
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

这将连接到多个队列,并相应地使用消息。

daolsyd0

daolsyd02#

此问题最有可能是第一个调用发出了Basic.Consume,并且在发出第二个调用之前已从预填充队列接收到消息。您可能希望尝试将QoS预取计数设置为1,这将限制RabbitMQ一次向您发送多条消息。

ckx4rj1h

ckx4rj1h3#

与上面第一个答案中的评论类似,我在pika 1.1.0和以下版本中得到了类似的结果:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)

def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()

相关问题