如何在python中使用multiprocessing/process_map等来加速繁重的计算(例如numpy)?

jc3wubiy  于 9个月前  发布在  Python
关注(0)|答案(1)|浏览(114)

我有一个用Numpy写的函数,每次调用花费大约2秒,但我必须运行它数万次。很明显,我应该使用多线程或多处理,最简单的方法可能是在tqdm.contrib.concurrent中使用process_mapthread_map。但我发现它并不像预期的那样工作。
下面是我测试过的一个例子:

import time
import numpy as np
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map

def mydataset(size, length):
    for ii in range(length):
        yield np.random.rand(*size)

def calc(mat):
    # simulate some heavy calculation
    for ii in range(1000):
        avg = np.mean(mat)
        std = np.std(mat)
    return avg, std

def main():
    ds = list(mydataset((500,500), 100))

    t0 = time.time()
    res1 = []
    for mat in tqdm(ds):
        res1.append(calc(mat))
    print(f'for loop: {time.time() - t0}s')

    t0 = time.time()
    res2 = list(map(calc, tqdm(ds)))
    print(f'native map: {time.time() - t0}s')

    t0 = time.time()
    res3 = process_map(calc, ds)
    print(f'process map: {time.time() - t0}s')

    t0 = time.time()
    res4 = thread_map(calc, ds)
    print(f'thread map: {time.time() - t0}s')

    pass

if __name__ == '__main__':
    main()

字符串
结果出来了:

100%|████████████████████████████████████████████| 100/100 [00:51<00:00,  1.93it/s]
for loop: 51.884708642959595s
100%|████████████████████████████████████████████| 100/100 [00:52<00:00,  1.91it/s]
native map: 52.48910164833069s
100%|████████████████████████████████████████████| 100/100 [01:10<00:00,  1.41it/s]
process map: 71.0565574169159s
100%|████████████████████████████████████████████| 100/100 [00:41<00:00,  2.39it/s]
thread map: 42.04276633262634s


process_map比for循环慢得多,thread_map也花费了太多时间,考虑到我的PC中有28个核心。是的,CPU使用率没有预期的高,只有1到2个核心是100%。
而且似乎当我的calc做一些简单的事情时,process_map确实有加速。
我的测试环境是openSUSE Tumbleweed with Linux 6.63python 3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 20:08:06) [GCC 11.3.0]tqdm 4.66.1
我也在windows 11 x64下用python 3.8.10 (tags/v3.8.10:3d8993a, May 3 2021, 11:48:03) [MSC v.1928 64 bit (AMD64)]tqdm 4.62.3尝试了之前的代码,结果差不多,taskmgr中CPU使用率达到100%(所有核心),但process_map(64.32秒)和thread_map(66.13秒)比for-loop(78.41秒)和map(79.79秒)稍快。
所以我的问题是,**在有大量计算的情况下,如何正确地进行多处理/多线程加速?**如何在不手动拆分数据集并同时运行多个for循环的情况下加速这些计算?

628mspwn

628mspwn1#

我测试了你的代码,在使用它的时候(4核)达到了64秒。后来我增加了核数到6和8,但只得到了很小的增益。所以它不受计算本身的限制,而是受矩阵复制的限制。
所以我改变了你的代码,使用管理器来保存你的数据。这只会在计算开始时复制一次矩阵。我运行了循环,获得了2秒的时间。这意味着,避免复制数据使它更快。复制的唯一内容是对管理器的引用和任务的一些整数。
下面是我的代码:

import time
import numpy as np
from multiprocessing import Pool, Manager

def mydataset(size, length):
    for ii in range(length):
        yield np.random.rand(*size)

def calc(idx, mat_list):
    # simulate some heavy calculation
    for ii in range(1000):
        avg = np.mean(mat_list[idx])
        std = np.std(mat_list[idx])
    return avg, std

def main():
    ds = list(mydataset((500, 500), 100))

    mypool = Pool(4)
    manager = Manager()
    mylist = manager.list(ds)

    t0 = time.time()
    res1 = mypool.starmap(calc, zip(range(len(ds)), mylist))
    print(f"map manager: {time.time() - t0}s")

if __name__ == "__main__":
    main()

字符串
输出量:

map manager: 1.935054063796997s


因此,在多处理过程中,尽量避免大量的复制操作,因为它们通常会成为操作的瓶颈。如果您有大量数据,请使用共享概念,其中数据仅复制一次,然后从该源读取。

相关问题