rabbitmq 一次使用多封邮件

8ulbf1ek  于 2022-11-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(132)

我正在使用一个外部服务(Service)来处理一些特定类型的对象。如果我以10个为一批发送对象,那么Service的工作速度会更快。我目前的架构如下。生产者逐个广播对象,一群消费者从队列中(逐个)提取对象并将其发送到Service。这显然不是最优的。
我不想修改生产者代码,因为它可以在不同的情况下使用。我可以修改消费者代码,但代价是增加复杂性。我也知道prefetch_count选项,但我认为它只在网络级别工作--客户端库(pika)不允许在消费者回调中一次提取多个消息。
那么,**RabbitMQ可以在将消息发送给使用者之前创建一批消息吗?**我正在寻找类似"一次使用 * n * 条消息"的选项。

mi7gmzs6

mi7gmzs61#

您不能在使用者回调中对消息进行批处理,但可以使用线程安全库并使用多个线程来使用数据。这样做的好处是,您可以在五个不同的线程上获取五条消息,并在需要时组合数据。
作为一个例子,你可以看看我将如何使用我的AMQP库实现这一点。https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

b5buobof

b5buobof2#

下面的代码将使用channel.consume来开始使用消息。当达到所需的消息数量时,我们将退出/停止。
我已经设置了一个batch_size来防止一次拉取大量的消息。你可以随时改变batch_size来满足你的需要。

def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs

相关问题