我想并行处理一个有多个消费者的队列。但我的方法不起作用。无论我使用多少消费者,它们似乎都会相互阻塞,并且处理输入队列总是花费相同的时间,无论消费者的数量如何:
import argparse
import json
import time
import pika
def make_connection() -> pika.BlockingConnection:
credentials = pika.PlainCredentials(username="user", password="bitnami")
kwargs = {'host': 'localhost', 'port': 5672, 'credentials': credentials}
parameters = pika.ConnectionParameters(**kwargs)
connection = pika.BlockingConnection(parameters=parameters)
return connection
def publish(channel: pika.adapters.blocking_connection.BlockingChannel, data):
message = {"data": data}
message = json.dumps(message).encode()
channel.basic_publish("", "in", message)
def make_inp_callback(marker: str):
def inp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
time.sleep(.2)
message = {"data": json.loads(body)["data"] + " consumer " + marker}
print(f"{marker}: {message}")
message = json.dumps(message).encode()
channel.basic_publish("", "out", message)
channel.basic_ack(method.delivery_tag)
return inp_callback
def make_outp_callback(n):
def outp_callback(channel: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
body = json.loads(body)
print(body)
channel.basic_ack(method.delivery_tag)
if body["data"].split(" ")[0] == str(n - 1):
channel.close()
return outp_callback
def set_up_consumers(channel, n):
for i in range(n):
marker = f"callback {i}"
cb = make_inp_callback(marker)
channel.basic_consume("in", cb)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("n_consumers", type=int)
parser.add_argument("n_messages", type=int)
args = parser.parse_args()
return args
def main():
args = parse_args()
connection = make_connection()
channel = connection.channel()
channel.basic_qos(prefetch_count=4)
channel.queue_declare("in")
channel.queue_declare("out")
for i in map(str, range(args.n_messages)):
publish(channel, i)
start = time.time()
set_up_consumers(channel, args.n_consumers)
channel.basic_consume("out", make_outp_callback(args.n_messages))
channel.start_consuming()
print(
f"time processing {args.n_messages} messages with {args.n_consumers} consumers: {time.time() - start}"
)
if __name__ == "__main__":
main()
-> % py multi_consumer_test.py 5 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
callback 1: {'data': '4 consumer callback 1'}
callback 1: {'data': '5 consumer callback 1'}
callback 1: {'data': '6 consumer callback 1'}
callback 1: {'data': '7 consumer callback 1'}
callback 2: {'data': '8 consumer callback 2'}
callback 2: {'data': '9 consumer callback 2'}
{'data': '0 consumer callback 0'}
{'data': '1 consumer callback 0'}
{'data': '2 consumer callback 0'}
{'data': '3 consumer callback 0'}
{'data': '4 consumer callback 1'}
{'data': '5 consumer callback 1'}
{'data': '6 consumer callback 1'}
{'data': '7 consumer callback 1'}
{'data': '8 consumer callback 2'}
{'data': '9 consumer callback 2'}
-> % py multi_consumer_test.py 1 10
callback 0: {'data': '0 consumer callback 0'}
callback 0: {'data': '1 consumer callback 0'}
callback 0: {'data': '2 consumer callback 0'}
callback 0: {'data': '3 consumer callback 0'}
{'data': '0 consumer callback 0'}
callback 0: {'data': '4 consumer callback 0'}
{'data': '1 consumer callback 0'}
callback 0: {'data': '5 consumer callback 0'}
{'data': '2 consumer callback 0'}
callback 0: {'data': '6 consumer callback 0'}
{'data': '3 consumer callback 0'}
callback 0: {'data': '7 consumer callback 0'}
{'data': '4 consumer callback 0'}
callback 0: {'data': '8 consumer callback 0'}
{'data': '5 consumer callback 0'}
callback 0: {'data': '9 consumer callback 0'}
{'data': '6 consumer callback 0'}
{'data': '7 consumer callback 0'}
{'data': '8 consumer callback 0'}
{'data': '9 consumer callback 0'}
time processing 10 messages with 1 consumers: 2.043266773223877
我的设置有什么错误?
1条答案
按热度按时间pgvzfuti1#
Pika不是线程安全的,它内部也不使用任何线程。在这种情况下,拥有多个消费者的正确方法是为每个连接启动一个
Thread
,并在每个线程上消费。