unix Python多处理-Assert错误:只能加入子进程

eyh26e7m  于 2023-02-19  发布在  Unix
关注(0)|答案(3)|浏览(124)

我第一次尝试python的多处理模块时遇到了一些问题,我对线程模块非常熟悉,但我需要确保我正在执行的进程是并行运行的。
下面是我要做的事情的概要。请忽略像未声明的变量/函数这样的东西,因为我不能完整地粘贴我的代码。

import multiprocessing
import time

def wrap_func_to_run(host, args, output):
    output.append(do_something(host, args))
    return

def func_to_run(host, args):
    return do_something(host, args)

def do_work(server, client, server_args, client_args):
    server_output = func_to_run(server, server_args)
    client_output = func_to_run(client, client_args)
    #handle this output and return a result
    return result

def run_server_client(server, client, server_args, client_args, server_output, client_output):
    server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
    server_process.start()  
    client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
    client_process.start()
    server_process.join()
    client_process.join()
    #handle the output and return some result    

def run_in_parallel(server, client):
    #set up commands for first process
    server_output = client_output = []
    server_cmd = "cmd"
    client_cmd = "cmd"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
    process_one.start()
    #set up second process to run - but this one can run here
    result = do_work(server, client, "some server args", "some client args")
    process_one.join()
    #use outputs above and the result to determine result
    return final_result

def main():
    #grab client
    client = client()
    #grab server
    server = server()
    return run_in_parallel(server, client)

if __name__ == "__main__":
    main()

下面是我得到的错误:

Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
    p.join()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
    assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process

我已经尝试了很多不同的东西来解决这个问题,但我的感觉是,有一些错误的方式,我使用这个模块。
编辑:
所以我创建了一个文件,通过模拟客户机/服务器和它们所做的工作来重现这个错误--而且我忽略了一个重要的点,那就是我是在unix中运行这个程序的。另一个重要的信息是,在我的实际情况中,do_work涉及到使用os.fork()。如果不使用os.fork(),我就无法重现这个错误,所以我假设问题就在那里。在我的真实情况下,这部分代码不是我的,所以我把它当作一个黑盒子(可能是我的一个错误)。

#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys

class Host():
    def __init__(self):
        self.name = "host"

    def work(self):
        #override - use to simulate work
        pass

class Server(Host):
    def __init__(self):
        self.name = "server"

    def work(self):
        x = 0
        for i in range(10000):
            x+=1
        print x
        time.sleep(1)

class Client(Host):
    def __init__(self):
        self.name = "client"

    def work(self):
        x = 0
        for i in range(5000):
            x+=1
        print x
        time.sleep(1)

def func_to_run(host, args):
    print host.name + " is working"
    host.work()
    print host.name + ": " + args
    return "done"

def do_work(server, client, server_args, client_args):
    print "in do_work"
    server_output = client_output = ""
    child_pid = os.fork()
    if child_pid == 0:
        server_output = func_to_run(server, server_args)
        sys.exit(server_output)
    time.sleep(1)

    client_output = func_to_run(client, client_args)
    # kill and wait for server to finish
    os.kill(child_pid, signal.SIGTERM)
    (pid, status) = os.waitpid(child_pid, 0)

    return (server_output == "done" and client_output =="done")

def run_server_client(server, client, server_args, client_args):
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
    print "Starting server process"
    server_process.start()
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
    print "Starting client process"
    client_process.start()
    print "joining processes"
    server_process.join()
    client_process.join()
    print "processes joined and done"

def run_in_parallel(server, client):
    #set up commands for first process
    server_cmd = "server command for run_server_client"
    client_cmd = "client command for run_server_client"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
    print "Starting process one"
    process_one.start()
    #set up second process to run - but this one can run here
    print "About to do work"
    result = do_work(server, client, "server args from do work", "client args from do work")
    print "Joining process one"
    process_one.join()
    #use outputs above and the result to determine result
    print "Process one has joined"
    return result

def main():
    #grab client
    client = Client()
    #grab server
    server = Server()
    return run_in_parallel(server, client)

if __name__ == "__main__":
    main()

如果我删除do_workos.fork()的使用,我不会得到错误,代码的行为就像我之前预期的那样(除了输出的传递,我已经接受了我的错误/误解)。我可以改变旧代码,不使用os.fork(),但我也想知道为什么会导致这个问题,如果有一个可行的解决方案。
编辑2:
我开始研究一个在可接受的答案之前省略os.fork()的解决方案。下面是我对可以完成的模拟工作量做了一些调整后的结果--

#!/usr/bin/python

import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty

class Host():
    def __init__(self):
        self.name = "host"

    def work(self, w):
        #override - use to simulate work
        pass

class Server(Host):
    def __init__(self):
        self.name = "server"

    def work(self, w):
        x = 0
        for i in range(w):
            x+=1
        print x
        time.sleep(1)

class Client(Host):
    def __init__(self):
        self.name = "client"

    def work(self, w):
        x = 0
        for i in range(w):
            x+=1
        print x
        time.sleep(1)

def func_to_run(host, args, w, q):
    print host.name + " is working"
    host.work(w)
    print host.name + ": " + args
    q.put("ZERO")
    return "done"

def handle_queue(queue):
    done = False
    results = []
    return_val = 0
    while not done:
        #try to grab item from Queue
        tr = None
        try:
            tr = queue.get_nowait()
            print "found element in queue"
            print tr
        except Empty:
            done = True
        if tr is not None:
            results.append(tr)
    for el in results:
        if el != "ZERO":
            return_val = 1
    return return_val

def do_work(server, client, server_args, client_args):
    print "in do_work"
    server_output = client_output = ""
    child_pid = os.fork()
    if child_pid == 0:
        server_output = func_to_run(server, server_args)
        sys.exit(server_output)
    time.sleep(1)

    client_output = func_to_run(client, client_args)
    # kill and wait for server to finish
    os.kill(child_pid, signal.SIGTERM)
    (pid, status) = os.waitpid(child_pid, 0)

    return (server_output == "done" and client_output =="done")


def run_server_client(server, client, server_args, client_args, w, mq):
    local_queue = multiprocessing.Queue()
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
    print "Starting server process"
    server_process.start()
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
    print "Starting client process"
    client_process.start()
    print "joining processes"
    server_process.join()
    client_process.join()
    print "processes joined and done"
    if handle_queue(local_queue) == 0:
        mq.put("ZERO")

def run_in_parallel(server, client):
    #set up commands for first process
    master_queue = multiprocessing.Queue()
    server_cmd = "server command for run_server_client"
    client_cmd = "client command for run_server_client"
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
    print "Starting process one"
    process_one.start()
    #set up second process to run - but this one can run here
    print "About to do work"
    #result = do_work(server, client, "server args from do work", "client args from do work")
    run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
    print "Joining process one"
    process_one.join()
    #use outputs above and the result to determine result
    print "Process one has joined"
    return_val = handle_queue(master_queue)
    print return_val
    return return_val

def main():
    #grab client
    client = Client()
    #grab server
    server = Server()
    val = run_in_parallel(server, client)
    if val:
        print "failed"
    else:
        print "passed"
    return val

if __name__ == "__main__":
    main()

这段代码有一些调整的打印输出,只是为了看看到底发生了什么。我使用了一个multiprocessing. queue来存储和共享进程之间的输出,并返回到我的主线程进行处理。我认为这解决了我的问题的python部分,但在我正在工作的代码中仍然有一些问题。我唯一能说的是,func_to_run的等价物涉及通过ssh发送命令并捕获任何错误以及输出。出于某种原因,对于执行时间较短的命令,这非常有效。但对于执行时间/输出大得多的命令来说就不太好了。我试着在代码中使用完全不同的工作值来模拟这个过程,但无法重现类似的结果。
我正在使用的EDIT 3库代码(也不是我的)使用Popen.wait()作为ssh命令,我刚刚读到:
等待子进程终止。设置并返回returncode属性。
Warning当使用stdout = PIPE和/或stderr = PIPE,并且〉子进程生成足够的输出到管道,导致它阻塞等待〉OS管道缓冲区接受更多数据时,这会死锁。使用communicate()避免这种情况。
我调整了代码,不缓冲,只是打印,因为它是接收和一切工作。

dddzy1tm

dddzy1tm1#

我可以更改旧代码,不使用os.fork(),但我也想知道为什么这会导致这个问题,如果有一个可行的解决方案。
理解这个问题的关键是确切地知道fork()做什么。CPython文档声明“Fork a child process”。但是这假定您理解C库调用fork()
glibc的手册页是这样描述它的:
fork()通过复制调用进程来创建新进程。新进程(称为子进程)与调用进程(称为父进程)完全相同,但以下几点除外:...
这基本上就像你把你的程序和它的程序状态(堆,堆栈,指令指针等)做了一个副本,让它独立于原来的执行。当这个子进程自然退出时,它将使用exit(),这将触发multiprocessing模块注册的atexit()处理程序。
你能做些什么来避免它呢?

  • 忽略os.fork():使用multiprocessing代替,就像您现在正在探索的那样
  • 可能有效:import multiprocessing执行fork()后,仅在需要时在子级或父级中执行。
  • 在子进程中使用_exit()(CPython docs声明,“注意退出的标准方式是sys.exit(n)。_exit()通常只应在子进程中的fork()之后使用。”)

https://docs.python.org/2/library/os.html#os._exit

6ju8rftf

6ju8rftf2#

在我看来,您线程化的次数太多了。我不会从run_in_parallel线程化它,而只是用适当的参数调用run_server_client,因为它们将在内部线程化。

e0uiprwp

e0uiprwp3#

除了Cain的优秀解决方案之外,如果您遇到了和我一样的情况,即无法控制子进程的创建方式,您可以尝试在子进程中注销atexit函数**,以消除这些消息:

import atexit
from multiprocessing.util import _exit_function

atexit.unregister(_exit_function)

注意:**这可能会导致泄漏。**例如,如果你的子进程有自己的子进程,它们将不会被清除。所以请澄清你的情况,然后彻底测试。

相关问题