python-3.x 在“listener”错误(多重处理、管理器和队列设置)时终止所有“worker”

wgx48brx  于 2022-11-26  发布在  Python
关注(0)|答案(1)|浏览(112)

我使用多重处理对不同的文件并行运行workersWorker的结果被放入队列。listener从队列中获取结果并将其写入文件。
有时listener可能会遇到错误(各种原因),在这种情况下,listener会安静地停止,但所有其他进程会继续运行(令人惊讶的是,worker错误会导致所有进程终止)。
我想在listener捕获到错误时停止所有进程(worker s,listener,e.t.c.)。如何完成?
我的代码方案如下:

def worker(file_path, q):
    ## do something
    q.put(1.)
    return True

def listener(q):
    while True:
        m = q.get()
            if m == 'kill':
                break
            else:
                try:
                    # do something and write to file
                except Exception as err:
                    # raise error
                    tb = sys.exc_info()[2]
                    raise err.with_traceback(tb)

def main():
    manager = mp.Manager()
    q = manager.Queue(maxsize=3)
    with mp.Pool(5) as pool:
        watcher = pool.apply_async(listener, (q,))
        files = ['path_1','path_2','path_3'] 
        jobs = [ pool.apply_async(worker, (p,q,)) for p in files ]
        
        # fire off workers
        for job in jobs: 
            job.get()
        # kill the listener when done
        q.put('kill')

# run
if __name__ == "__main__":
   main()

我尝试引入event = manager.Event()并将其用作main()中的标志:

## inside the pool, after starting workers
while True:
    if event.is_set():
        for job in jobs:
            job.terminate()

未成功。在listener异常块中调用os._exit(1)会引发管道断开错误,但进程不会被终止。
我还尝试设置daemon = True,

for job in jobs:
    job.daemon = True

没有帮助。
实际上,为了处理listener异常,我使用了apply_async所要求的一个可调用函数(这样它们就不会被完全静默)。
先谢谢你了。

z18hc3ub

z18hc3ub1#

和往常一样,有很多方法可以实现您所追求的目标,但我可能会建议使用Event来发出进程应该退出的信号。因为它只在您需要map之类的简单用例时真正简化了事情。您需要的功能。

from multiprocessing import Process, Queue, Event
from random import random

def might_fail(a):
    assert(a > .001)

def worker(args_q: Queue, result_q: Queue, do_quit: Event):
    try:
        while not do_quit.is_set():
            args = args_q.get()
            if args is None:
                break
            else:
                # do something
                result_q.put(random())
    finally: #signal that worker is exiting even if exception is raised
        result_q.put(None) #signal listener that worker is exiting

def listener(result_q: Queue, do_quit: Event, n_workers: int):
    n_completed = 0
    while n_workers > 0:
        res = result_q.get()
        if res is None:
            n_workers -= 1
        else:
            n_completed += 1
            try:
                might_fail(res)
            except:
                do_quit.set() #let main continue
                print(n_completed)
                raise #reraise error after we signal others to stop
    do_quit.set() #let main continue
    print(n_completed)

if __name__ == "__main__":
    args_q = Queue()
    result_q = Queue()
    do_quit = Event()
    n_workers = 4

    listener_p = Process(target=listener, args=(result_q, do_quit, n_workers))
    listener_p.start()

    for _ in range(n_workers):
        worker_p = Process(target=worker, args=(args_q, result_q, do_quit))
        worker_p.start()

    for _ in range(1000):
        args_q.put("some/file.txt")

    for _ in range(n_workers):
        args_q.put(None)

    do_quit.wait()
    print('done')

相关问题