我正在构建一个应用程序,它使用多处理和多线程模块处理不同的子进程和线程。在子进程中启动线程时,我得到了一些意外的结果。
我有一个从套接字读取数据并调用API的子进程服务器。
服务器子进程:
class ServerSubprocess(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
...
self._cnnr = ApiConnector()
self._cnnr.connect()
def run(self) -> None:
while True:
...
req_params = socket.recv()
self.request_data(req_params)
def request_data(self, req_params, max_retries=5):
retries = 0
while retries < max_retries:
try:
resp = self._cnnr.request_api(**req_params)
except (ConnectionError, TimeoutError) as e:
pass
reties += 1
return resp
连接器类用作API和服务器之间的接口:
连接器类
class Connector(ConnectorInterface):
def __init__(self):
self._api = Api(name, host, port)
def connect(self,, n_attempts=10):
n = 0
while n < n_attempts:
try:
self._api.connect()
except (asyncio.TimeoutError, ConnectionError):
n += 1
self._logger.info(f"Couldn't connect. Attempting again... attempt {n + 1}")
def request_api(self, sec, time_params, side):
bars = self._api.request_data(req_params)
API由一个客户端(main_thread)和一个read_thread(读取对API请求的响应)组成,客户端将另一个套接字连接到数据源:
API客户端:
class Api:
def __init__(self, name: str, host: str, port: int):
self._host = host
self._port = port
self._name = name
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._read_thread = threading.Thread(group=None, target=self,
name="%s-reader" % self._name,
args=(), kwargs={}, daemon=None)
def connect(self) -> None:
self._sock.connect((self._host, self._port))
self._send_connect_message()
self._read_thread.start()
def __call__(self)
while not self._stop.is_set():
if self._read_messages():
self._process_messages()
def _process_messages():
"""Retrieves req_id from message and process raw message"""
....
req_id, data = self._read_from_socket()
self._data_buff[req_id] = data
self_notify_response(req_id)
def _read_messages(self) -> bool:
"""Checks if socket has data"""
....
def _notify_response(self, req_id) -> bool:
"""Sets the event for a req_id"""
self.__req_event[req_id].set()
def request_data(self, data) -> np.array:
req_id = self._get_next_req_id()
self._req_event[req_id] = threading.Event()
self._send_cmd(data)
self._req_event[req_id].wait(timeout=timeout)
data = self._data_buff[req_id]
return data
这里的问题不在于API或连接器的实现。我所包含的代码是一个简化版本,以显示组件的主要流程。问题在于使用子进程 Package 服务器。即使连接器执行其connect方法后,API上的read_thread运行没有问题,当请求数据时()方法在_req_eventdict中创建新的request_id,然后read_thread检查该req_id的传入响应,read_thread上没有确认更新。因此,当read_thread试图检索不存在键的req_id时,引发KeyError异常。此外,在发送新请求时调试API之后,read_thread看起来停止,即使它是在API init上启动的。这解释了为什么read_thread没有确认新api请求的新req_id。
然而,有趣的是,当服务器作为线程而不是进程启动时,整个流程工作正常。这对我来说没有意义,因为两个线程都在同一个pid下。似乎,一旦在API connect()上启动read_thread,对read_thread的引用就丢失了。显然,我错过了在子进程上启动线程的一些内容。
欢迎任何帮助。
1条答案
按热度按时间wnavrhmk1#
问题出在我如何连接新进程中的连接器上。连接是在子进程的上下文之外创建的。线程read_thread是在父进程中启动的,而不是在子进程中启动的。这就是为什么我失去了引用,我的意思是我没有真正失去引用,因为线程从未在子进程中启动过!要解决这个问题,
self._cnnr.connect()
应该在run()中。这是因为没有正确理解multiprocessing.Process对象。运行时是在run()方法中定义的,而不是在init中。这意味着线程应该在run方法中启动,而不是在init中启动。