numpy 在不同脚本中调用python多处理脚本

atmip9wb  于 11个月前  发布在  Python
关注(0)|答案(2)|浏览(119)

我有一个脚本multiprocess.py包含这样的内容:

import os
import numpy as np
from multiprocessing import Pool

abc = np.load('somedata.npy')

def func(k):

    fret1 = k+abc*k
    fret2 = k+5*k^2
    fret3 = k-abc
    
    return fret1, fret2, fret3

def worker(k):
    try:
        result = func(k)
        return result
    except Exception as e:
        print(f"An error occurred for k={k}: {e}")
        return None

def run_mp():
    xyz = np.random.random(100)
    iterator= range(len(xyz))

    # Create a list to store the results
    results = []
    ret1    = []
    ret2    = []
    ret3    = []

    # Use multiprocessing.Pool to run worker function in parallel
    with Pool(os.cpu_count()-1) as pool:
        results = pool.map(worker, iterator) 
    
    # Filter out None values (occurred due to errors)
    results = [result for result in results if result is not None]
    # Unpack the results and collect ret1, ret2, and ret3 values
    for fret1, fret2, fret3 in results:
        ret1.append(fret1)
        ret2.append(fret2)
        ret3.append(fret3)

    print("Multiprocesssing complete")
    np.savez('data.npz', xyz=xyz, ret1=ret1, ret2=ret2, ret3=ret3)

if __name__ == "__main__":
    run_mp()

独立运行时,它运行得很好,但我想在另一个Python脚本中调用它,如下所示:

with open("multiprocess.py") as f:
    exec(f.read())

不幸的是,这似乎不起作用,我得到了一个无限循环。
我也试着用这个命令打开脚本:

if __name__ == "__main__":
    with open("multiprocess.py") as f:
        exec(f.read())

还有这个

from multiprocess import run_mp

run_mp()

PS:我正在工作,并试图在Windows上运行Spyder中的代码,如果这在某种程度上很重要。

dz6r00yl

dz6r00yl1#

你的问题是由2-3种不同的方式独立引起的:
1.如果不使用exec,则无法在调用run_mp时使用导入保护
1.使用exec定义事物会以各种方式破坏multiprocessing
1.(取决于IDE)IDE通常对multiprocessing来说是致命的(它们不以正常的方式调用脚本,并且multiprocessing所需的不变量没有正确建立)
问题原因#1是最简单的:将run_mp排除在导入保护区之外意味着,在除了'fork'(在Windows上不可用)之外的任何启动方法模式下,它都会尝试在每个worker中递归地调用它,因此主进程会产生一个worker池来做工作,而这些worker池又会尝试产生一个worker池来做同样的工作,但它们中没有一个实际上做过任何工作。
关于问题原因#2,具体来说,当将任务分派给worker时,multiprocessing使用pickle模块来序列化函数及其参数。函数通过它们的限定名进行pickle,pickle通过直接查找来确认该限定名是法律的。这有三个含义:
1.你不能把lambda s传递给Pool.map,即使你已经把它们赋给了一个名字,例如。foo = lambda x, y: x + y(因为他们报告的名称是modulename.<lambda>,而不是modulename.foo

  1. exec在没有在全局范围内调用时会中断(因为有问题的模块没有被导入,并且因为globals检查__name__之类的东西时会报告它被exec插入的模块,而不是它来自的模块)。
    1.即使exec在全局范围内执行(这意味着pickle可以找到定义的名称),如果exec出现在if __name__ == '__main__':保护块中,它也不会在子进程中执行,因此工作进程中的 * unpickle * 将失败,即使父进程中的 * pickle * 成功。
    关于问题原因#3,multiprocessing经常在IDE中运行时被破坏,因为许多IDE实际上做了一些与exec非常相似的事情-从IDE Package 器中提取脚本,该IDE Package 器是 * 实际 * __name__ == "__main__"脚本;即使测试仍然在脚本中声明__name__ == "__main__",当pickle检查sys.modules['__main__']以查找函数时,它最终会找到IDE实际运行的框架模块,而不是“真实的”脚本,结果会出错(在分析下运行multiprocessing脚本时也会发生这种情况,python -mcProfile myscript.pycProfile作为sys.modules['__main__']条目,而不是myscript)。
    要解决这些问题,请执行以下操作:
    1.不要在IDE中运行multiprocessing脚本(也不要从交互式终端)。在一个独立的终端中从命令行运行它们(例如,在Windows、cmd.exe或Powershell上,理想情况下通过Windows启动器运行,如果安装为管理员并选中其复选框,例如py -3 path/to/script.py
    1.不要使用exec(一般来说,这是一个好主意,这是一种更正确、更简洁、更高效的方式,因为缓存的导入在每个进程中只加载一次,而不是为每个exec加载;有exec的用例,但它们非常狭窄,它永远不应该是第一个,第二个,第三个,甚至第四个工具
    1.使用您的import后卫一贯;当您使用import时,您没有保护run_mp的调用,因此启动工作进程的过程将尝试在每个工作进程中运行run_mp。当在Windows系统上运行时,这是multiprocessing编程指南的关键部分(其中'spawn'是默认的且 * 唯一 * 可用的启动方法),并且在其他地方坚持它是一个好主意(例如。macOS在最新版本的Python中默认为'spawn''fork'可用,但由于非fork安全的系统库而危险,甚至在Linux/BSD上,当父进程可能很大并且默认的'fork'可能会消耗大量内存时,有些人会选择使用'spawn''forkserver'启动方法,可能导致进程崩溃或由于虚拟内存使用过多而触发OOM Killer)。
    简而言之,您的最终示例用法几乎是正确的(如果从单独的脚本调用,而不是交互式会话,也不是通过IDE)。您只需要将其更改为:
from multiprocess import run_mp

if __name__ == "__main__":
    run_mp()
cgyqldqp

cgyqldqp2#

在Py 3.11上尝试这个,使用一个虚拟的“multiprocess.py”,看起来它在作为子进程(exec部分)执行时无法Pickle函数。
我的建议是使用import并调用“run_mp()”方法,而不是执行脚本。
我的“multiprocess.py“代码:

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

我的“main.py“脚本:

def exec_work():
    exec(open("multiprocess.py").read())

def import_call():
    from question.multiprocess import work
    work()

if __name__ == '__main__':
    exec_work()   # <-- fails due to pickling
    import_call() # <-- works fine

所有内容都在同一个目录中,以避免任何导入问题!

相关问题