如何使用Pika和RabbitMQ的多线程来执行请求和响应RPC消息

gv8xihay  于 2023-03-30  发布在  RabbitMQ
关注(0)|答案(1)|浏览(183)

我正在用RabbitMQ做一个项目。我使用RPC模式,基本上我是从队列接收或消费消息,进行一些处理,然后发送一个响应回来。我使用Pika,我的目标是每个任务使用一个线程,所以对于每个任务,我会为该任务专门创建一个线程。我还读到最佳实践是只建立一个连接,并根据我的需要建立许多通道,但我总是得到这样的错误:
pika.exceptions.RecursionError:不能从另一个BlockingConnection或BlockingChannel回调的范围调用start_consuming。
我做了一些研究,发现Pika不是线程安全的,我们应该为每个线程使用一个独立的连接和一个通道。但我不想这样做,因为这被认为是不好的做法。所以我想问这里是否有人已经实现了这一点。
我还读到,如果我没有使用BlockingConnection来示例化我的连接,也有一个名为add_callback_threadsafe的函数可以实现这一点。但不幸的是,没有示例,我读了文档,但它很复杂,没有示例,我很难理解他们想要描述的内容。
我的尝试是声明两个类。每个类将代表一个任务Executer,它从队列中接收或消费消息,并在此基础上进行一些处理并返回响应。我的想法是在两个任务之间共享RabbitMQ连接,但每个任务都将获得一个独立的通道。
在上面的代码中,传递给函数的rabbit参数是一个类,它包含一些变量(如connection)和其他函数(如EventSubscriber),当调用时,它将分配一个新的Channel并开始从该特定交换和routingKey消费消息。接下来,我声明一个Thread并给予subscribe或consume函数作为该线程的Target。其他任务类看起来也与此Class相同,它是“这就是为什么我只上传这段代码的原因。在主类中,我建立了一个到RabbitMQ的连接,并将其作为参数传递给两个任务类的构造函数。

class On_Deregistration:
 
    def __init__(self, rabbit):
       self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq

    def event(self, rabbit):
     
        self.Subscriber = rabbit.EventSubscriber(rabbit,  'testing.test',  'test', False,  onDeregistrationFromHRS # this func is task listener)

    def subscribeAsync(self):
        self.Subscriber.subscribe() # here i call start_consuming

    def start(self):
        """start Subscribtion in an Independant Thread  """
        thread = threading.Thread(target = self.subscribeAsync )  
        thread.start()
        if thread.isAlive():
            print("asynchronous subscription started")
# Main Class:
class App:

    def __init__(self):
    
        self.rabbitMq = RabbitMqCommunicationInterface(host='localhost', port=5672)
        firstTask =  On_Deregistration(self.rabbitMq)
        secondTask =  secondTask(self.rabbitMq)

app = App()

我搜索了这个错误的原因,显然Pika不是线程安全的,但必须有一个解决方案。也许不使用BlockingConnection?也许我错过了一些关于如何使用RabbitMQ实现多线程的东西。

np8igboo

np8igboo1#

所以经过长时间的研究,我发现Pika不是线程安全的。至少在目前,也许在新版本中它会是线程安全的。所以现在对于我的项目,我停止使用Pika,我使用b-rabbit,这是Rabbitpy的线程安全 Package 器。但我必须说Pika是一个很棒的库,我发现它的API比rabbitpy更好地描述和结构化,但对于我的项目来说,它必须使用多线程,这是一个很好的例子。这就是为什么皮卡目前是一个糟糕的选择。我希望这对未来的人有所帮助

相关问题