我使用多重处理对不同的文件并行运行workers
。Worker
的结果被放入队列。listener
从队列中获取结果并将其写入文件。
有时listener
可能会遇到错误(各种原因),在这种情况下,listener
会安静地停止,但所有其他进程会继续运行(令人惊讶的是,worker
错误会导致所有进程终止)。
我想在listener
捕获到错误时停止所有进程(worker
s,listener
,e.t.c.)。如何完成?
我的代码方案如下:
def worker(file_path, q):
## do something
q.put(1.)
return True
def listener(q):
while True:
m = q.get()
if m == 'kill':
break
else:
try:
# do something and write to file
except Exception as err:
# raise error
tb = sys.exc_info()[2]
raise err.with_traceback(tb)
def main():
manager = mp.Manager()
q = manager.Queue(maxsize=3)
with mp.Pool(5) as pool:
watcher = pool.apply_async(listener, (q,))
files = ['path_1','path_2','path_3']
jobs = [ pool.apply_async(worker, (p,q,)) for p in files ]
# fire off workers
for job in jobs:
job.get()
# kill the listener when done
q.put('kill')
# run
if __name__ == "__main__":
main()
我尝试引入event = manager.Event()
并将其用作main()中的标志:
## inside the pool, after starting workers
while True:
if event.is_set():
for job in jobs:
job.terminate()
未成功。在listener
异常块中调用os._exit(1)会引发管道断开错误,但进程不会被终止。
我还尝试设置daemon = True,
for job in jobs:
job.daemon = True
没有帮助。
实际上,为了处理listener
异常,我使用了apply_async
所要求的一个可调用函数(这样它们就不会被完全静默)。
先谢谢你了。
1条答案
按热度按时间z18hc3ub1#
和往常一样,有很多方法可以实现您所追求的目标,但我可能会建议使用
Event
来发出进程应该退出的信号。因为它只在您需要map
之类的简单用例时真正简化了事情。您需要的功能。