python-3.x 如何使用aio-pika从多个队列获取消息

lg40wkob  于 2022-12-27  发布在  Python
关注(0)|答案(1)|浏览(497)

我刚刚开始使用RabbitMQ,使用了aio-pika,并且我有多个队列名称要使用。到目前为止,我在worker.py中使用了本教程,但使用了多个队列声明,因此看起来如下所示:

import asyncio
from aio_pika import connect    

async def main() -> None:
        # Perform connection
        connection = await connect(Settings.RABBIT_URL)
    
        async with connection:
            # Creating a channel
            channel = await connection.channel()
            await channel.set_qos(prefetch_count=0)
    
            # Declaring queue
            queue = await channel.declare_queue(
                "queue_1",
                durable=True,
            )
    
            queue2 = await channel.declare_queue(
                "queue_2",
                durable=True,
            )
    
            queue3 = await channel.declare_queue(
                "queue_3",
                durable=True,
            )
    
            # Start listening the queue with name 'task_queue'
            await queue.consume(on_message)
            await queue2.consume(on_message)
            await queue3.consume(on_message)
    
            print(" [*] Waiting for messages. To exit press CTRL+C")
            await asyncio.Future()

问题是,我需要使它灵活地声明队列,数量与我可以从数据库中获取的队列名称一样多,因此,首先是我声明多个队列的方法是否正确,其次是如何基于队列名称列表声明队列?
谢谢你。

wbgh16ku

wbgh16ku1#

好了,经过反复试验,基本上我声明队列的方法是正确的,下一个关于声明队列和动态消费的问题可以通过使用循环查询结果和使用locals()动态初始化变量来完成。

queues = dict()
   for q in range(0, len(query)):
      queue_name = query[q]['result']
      locals()['queues_{0}'.format(q)] = await channel.declare_queue(queue_name, durable=True)
      await locals()['queues_{0}'.format(q)].consume(on_message)

相关问题