python 丢失对子进程中线程的引用

kognpnkq  于 2023-02-11  发布在  Python
关注(0)|答案(1)|浏览(101)

我正在构建一个应用程序,它使用多处理和多线程模块处理不同的子进程和线程。在子进程中启动线程时,我得到了一些意外的结果。
我有一个从套接字读取数据并调用API的子进程服务器。

服务器子进程:

class ServerSubprocess(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        ...
        self._cnnr = ApiConnector()
        self._cnnr.connect()

    def run(self) -> None:
        while True:
            ...
            req_params = socket.recv()
            self.request_data(req_params)

    def request_data(self, req_params, max_retries=5):
        retries = 0

        while retries < max_retries:
            try:
                resp = self._cnnr.request_api(**req_params)
            except (ConnectionError, TimeoutError) as e:
                pass

            reties += 1
        return resp

连接器类用作API和服务器之间的接口:

连接器类

class Connector(ConnectorInterface):

def __init__(self):
    self._api = Api(name, host, port)

def connect(self,, n_attempts=10):
    n = 0

    while n < n_attempts:
        try:
            self._api.connect()
        except (asyncio.TimeoutError, ConnectionError):
            n += 1
            self._logger.info(f"Couldn't connect. Attempting again... attempt {n + 1}")

def request_api(self, sec, time_params, side):
    bars = self._api.request_data(req_params)

API由一个客户端(main_thread)和一个read_thread(读取对API请求的响应)组成,客户端将另一个套接字连接到数据源:

API客户端:

class Api:

    def __init__(self, name: str, host: str, port: int):
        self._host = host
        self._port = port
        self._name = name

        
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._read_thread = threading.Thread(group=None, target=self,
                                             name="%s-reader" % self._name,
                                             args=(), kwargs={}, daemon=None)

    def connect(self) -> None:
        self._sock.connect((self._host, self._port))
        self._send_connect_message()
        self._read_thread.start()
     

    def __call__(self)
        while not self._stop.is_set():
            if self._read_messages():
                self._process_messages()

    def _process_messages():
    """Retrieves req_id from message and process raw message"""
       ....
       req_id, data = self._read_from_socket()
       self._data_buff[req_id] = data
       self_notify_response(req_id)

    
    def _read_messages(self) -> bool:
    """Checks if socket has data"""
       ....

    def _notify_response(self, req_id) -> bool:
    """Sets the event for a req_id"""
       self.__req_event[req_id].set()

    def request_data(self, data) -> np.array:
        req_id = self._get_next_req_id()
        self._req_event[req_id] = threading.Event()
        self._send_cmd(data)
        self._req_event[req_id].wait(timeout=timeout)
        data = self._data_buff[req_id]
        return data

这里的问题不在于API或连接器的实现。我所包含的代码是一个简化版本,以显示组件的主要流程。问题在于使用子进程 Package 服务器。即使连接器执行其connect方法后,API上的read_thread运行没有问题,当请求数据时()方法在_req_eventdict中创建新的request_id,然后read_thread检查该req_id的传入响应,read_thread上没有确认更新。因此,当read_thread试图检索不存在键的req_id时,引发KeyError异常。此外,在发送新请求时调试API之后,read_thread看起来停止,即使它是在API init上启动的。这解释了为什么read_thread没有确认新api请求的新req_id。
然而,有趣的是,当服务器作为线程而不是进程启动时,整个流程工作正常。这对我来说没有意义,因为两个线程都在同一个pid下。似乎,一旦在API connect()上启动read_thread,对read_thread的引用就丢失了。显然,我错过了在子进程上启动线程的一些内容。
欢迎任何帮助。

wnavrhmk

wnavrhmk1#

问题出在我如何连接新进程中的连接器上。连接是在子进程的上下文之外创建的。线程read_thread是在父进程中启动的,而不是在子进程中启动的。这就是为什么我失去了引用,我的意思是我没有真正失去引用,因为线程从未在子进程中启动过!要解决这个问题,self._cnnr.connect()应该在run()中。
这是因为没有正确理解multiprocessing.Process对象。运行时是在run()方法中定义的,而不是在init中。这意味着线程应该在run方法中启动,而不是在init中启动。

相关问题