rabbitmq 如何设置平行消费者?

6tqwzwtp  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(141)

我想并行处理一个有多个消费者的队列。但我的方法不起作用。无论我使用多少消费者,它们似乎都会相互阻塞,并且处理输入队列总是花费相同的时间,无论消费者的数量如何:

import argparse
import json
import time

import pika

def make_connection() -> pika.BlockingConnection:

    credentials = pika.PlainCredentials(username="user", password="bitnami")
    kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}

    parameters = pika.ConnectionParameters(**kwargs)

    connection = pika.BlockingConnection(parameters=parameters)

    return connection

def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
    message = {"data": data}
    message = json.dumps(message).encode()
    channel.basic_publish("", "in", message)

def make_inp_callback(marker: str):
    def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        time.sleep(.2)
        message = {"data": json.loads(body)["data"] + " consumer " + marker}
        print(f"{marker}: {message}")
        message = json.dumps(message).encode()
        channel.basic_publish("", "out", message)
        channel.basic_ack(method.delivery_tag)

    return inp_callback

def make_outp_callback(n):
    def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
        body = json.loads(body)
        print(body)
        channel.basic_ack(method.delivery_tag)
        if body["data"].split(" ")[0] == str(n - 1):
            channel.close()

    return outp_callback

def set_up_consumers(channel, n):
    for i in range(n):
        marker = f"callback {i}"
        cb = make_inp_callback(marker)
        channel.basic_consume("in", cb)

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("n_consumers", type=int)
    parser.add_argument("n_messages", type=int)

    args = parser.parse_args()

    return args

def main():

    args = parse_args()

    connection = make_connection()
    channel = connection.channel()
    channel.basic_qos(prefetch_count=4)
    channel.queue_declare("in")
    channel.queue_declare("out")

    for i in map(str, range(args.n_messages)):
        publish(channel, i)

    start = time.time()

    set_up_consumers(channel, args.n_consumers)

    channel.basic_consume("out", make_outp_callback(args.n_messages))

    channel.start_consuming()

    print(
        f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
    )

if __name__ == "__main__":
    main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877

我的设置有什么错误?

pgvzfuti

pgvzfuti1#

Pika不是线程安全的,它内部也不使用任何线程。在这种情况下,拥有多个消费者的正确方法是为每个连接启动一个Thread,并在每个线程上消费。

相关问题