Rabbitmq - start_consuming不能从另一个BlockingConnection或BlockingChannel回调的作用域调用

jdg4fx2g  于 12个月前  发布在  RabbitMQ
关注(0)|答案(1)|浏览(137)

我有两个python进程,一个消费者进程和一个生产者进程。每个进程将启动一个rabbitmq连接并产生多个消费者/生产者线程。每个线程将在连接中创建一个通道并执行消息发送和接收逻辑。
这是我的消费线索

def consumer_thread(connection, routing_key):
    channel = connection.channel()
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange="test", routing_key=routing_key, queue=queue_name)
    thread_name = current_thread().name

    def process(ch, method, properties, body):
        print(f"{thread_name} received {body}")

    channel.basic_consume(process, queue=queue_name, no_ack=True)
    channel.start_consuming()

字符串
这是我的制作人帖子

def producer_thread(connection, routing_key, sleep_time):
    channel = connection.channel()
    thread_name = current_thread().name
    count = 0

    while True:
        count += 1
        channel.basic_publish("test", routing_key=routing_key,
                              body=f"msg {count} from {thread_name}")
        time.sleep(sleep_time)


然后我启动一个rabbitmq连接,
第一个月
然而,当我运行我的代码时,对于消费者线程接收到的第一条消息,我得到了这个错误消息

Traceback (most recent call last):
  File "D:\app\cortex-bin\Python36\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\app\cortex-bin\Python36\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\app\cortex\background\core\scratch\test.py", line 18, in consumer_thread
    channel.start_consuming()
  File "D:\app\cortex-bin\Python36\lib\site-packages\pika\adapters\blocking_connection.py", line 1817, in start_consuming
    'start_consuming may not be called from the scope of 'pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback'


对于所有后续消息,它们都可以被消费者线程很好地接收。
我能知道是什么导致了这个异常吗?谢谢。

xbp102n0

xbp102n01#

您不能从多个线程访问Pika连接(注解)。您的线程必须启动自己的连接和通道。

相关问题