如何使用pika使用RabbitMQ队列中的所有消息

iovurdzv  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(290)

我想用Python编写一个守护进程,它周期性地唤醒来处理RabbitMQ队列中的一些数据。
当守护进程醒来时,它应该使用队列中的所有消息(或者min(len(queue), N),其中N是某个任意数字),因为这样更适合批量处理数据。与传入在每个消息到达时调用的回调相反,在pika中是否有方法可以做到这一点?

  • 谢谢-谢谢
of1yzvn4

of1yzvn41#

您可以使用basic.get API,它从代理中提取消息,而不是订阅要推送的消息

zi8p0yeb

zi8p0yeb2#

下面是使用pika编写的代码。
下面的代码将使用channel.consume来开始使用消息。当达到所需的消息数量时,我们将中断/停止。
我已经设置了一个batch_size来防止一次拉取大量的消息。你可以随时改变batch_size来满足你的需要。

from pika import BasicProperties, URLParameters
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelWrongStateError, StreamLostError, AMQPConnectionError
from pika.exchange_type import ExchangeType
import json

    def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs

相关问题