RabbitMQ队列Pika的多处理

wlzqhblo  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(144)

我正在使用RabbitMQ队列。我想通过一个程序运行多个消费者对象示例。下面是创建1 producer and 1 consumerOperator

class Operator(object):

    def __init__(self, delegate: callable, identifier):
        """
        Create a new instance of the Operator and initialize the connections
        """
        self._queue_details = self._get_queue_details()
        self._host_ip = self._queue_details['IP']
        self._port = self._queue_details['Port']
        self._username = self._queue_details['Username']
        self._password = self._queue_details['Password']
        self._input_queue_name = self._queue_details['ReadQueueName']
        self._output_queue_name = self._queue_details['WriteQueueName']
        self._error_queue_name = self._queue_details['ErrorQueueName']
        self._delegate = delegate
        self._identifier = identifier
        self._queue_connection = None
        self._input_channel = None
        self._output_channel = None
        self._error_channel = None
        self.is_busy = False
        self.mark_to_terminate = False

    def __del__(self):
        # close connections
        self._queue_connection.close()

    @staticmethod
    def _initialize_channel(connection, queue_name, durable):
        channel = connection.channel()
        channel.queue_declare(queue=queue_name, durable=durable)
        return channel

    @staticmethod
    def _get_queue_details() -> dict:
        return ConfigurationManager().get_value('queueDetails')

    @staticmethod
    def _get_connection(username, password, host_ip, port):
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            credentials=pika.PlainCredentials(username, password), host=host_ip, port=port))
        return connection

    def initialize_operator(self):
        connection = self._get_connection(self._username, self._password, self._host_ip, self._port)
        self._queue_connection = connection
        self._input_channel = self._initialize_channel(connection, self._input_queue_name, durable=False)
        self._output_channel = self._initialize_channel(connection, self._output_queue_name, durable= True)
        self._error_channel = self._initialize_channel(connection, self._error_queue_name, durable=True)

    def consume(self):
        self._input_channel.basic_qos(prefetch_count=1)
        self._input_channel.basic_consume(self._process_incoming_message, queue=self._input_queue_name)
        self._input_channel.start_consuming()

    def _push_to_queue(self, channel, response):
        channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=response,
                                properties=pika.BasicProperties(delivery_mode=2))  # make message persistent

    def _process_incoming_message(self, channel, method, properties, message):
        self.is_busy = True
        processed_result, is_error = self._delegate(message)

        if is_error:
            self._error_channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=processed_result,
                                                properties=pika.BasicProperties(delivery_mode=2))
        else:
            self._output_channel.basic_publish(exchange='', routing_key=self._output_queue_name, body=processed_result,
                                                properties=pika.BasicProperties(delivery_mode=2))

        # send in the final ack of the process.
        channel.basic_ack(delivery_tag=method.delivery_tag)

        # close connection if to avoid receiving messages
        if self.mark_to_terminate:
            self._queue_connection.close()

        self.is_busy = False

从我的主脚本中,我像下面这样旋转代理:

# spins up the agent
for count in range(spin_up_count):
    instance = Operator(self._translate_and_parse, f'Operator: {time.time()}')
    instance.initialize_operator()
    process = Process(target=instance.consume)
    process.start()
    self._online_agents.append((instance, process))

问题是,当我选择process.start()时,它会抛出TypeError
TypeError:无法pickle _thread.lock对象

完整堆栈跟踪

File "C:/Users/adity/Documents/PythonProjects/Caligo/Caligo/QueueService.py", line 201, in _scale_up
process.start()
File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\adity\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
ajsxfq5m

ajsxfq5m1#

在启动分叉进程之前不要示例化Operator对象。你也不能让instance.consume成为forked进程的目标。
Process示例的target方法应该创建Operator示例,然后调用consume方法。
如果你需要管理分叉的进程,你应该跟踪进程ID,并使用信号与它们通信。

相关问题