python 并行计算和填充数组

oyxsuwqo  于 2023-02-07  发布在  Python
关注(0)|答案(1)|浏览(184)

作为信号处理任务的一部分,我对每个频率步长进行一些计算。
我有一个频率列表,长度为513
我有一个3D numpy数组A,其形状为shape(81,81,513),其中513是频率点的数量,然后我有一个81x81的矩阵表示每个频率。
我想对每个矩阵做一些修改,得到A的修改版本,我在这里命名为B,它也是形状(81,81,513)
为此,我开始预分配B:

B = np.zeros_like(A)

然后,我在频率上循环并调用一个dothing函数,如下所示:

for index, frequency in enumerate(frequencies):
   B[:,:,index] = dothing(A[:,:,index])

问题是,dothing花费了大量时间,并且连续运行513个频率步长似乎无穷无尽。
所以我想把它并行化。但即使在阅读了大量文档和视频之后,我还是迷失在所有的库和潜在的解决方案中。所有单个频率的计算都可以独立完成。但最终我需要按照正确的顺序将所有内容分配回B。
你知道怎么做吗?
先谢了
安托万

4zcjmb1e

4zcjmb1e1#

这里我将使用一个使用shared_memory的共享数组,因为如果没有两个循环迭代使用相同的内存地址,就不需要保护写访问。我删除了第二个数组以缩短示例(只构建一个共享数组),并且我重新排序了数组形状以更好地保留内存对齐访问。

from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import numpy.typing as npt
from typing import Any
from time import sleep

def dothing(arr: np.ndarray, t_func: Any) -> np.ndarray:
    sleep(.05) #simulate hard work
    return arr * 2

def dodothing(args: tuple[int, Any]):
    global arr
    index = args[0]
    t_func = args[1]
    arr[index] = dothing(arr[index], t_func) #write result back to self to avoid need for 2 shared arrays

def init(shm: SharedMemory, shape: tuple[int, ...], dtype: npt.DTypeLike):
    global arr
    arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

if __name__ == "__main__":
    _A = np.ones((513,81,81), np.float64) #source data

    t_funcs = ["some transfer function"] * _A.shape[0] #added example of passing some data + an index

    nbytes = _A.size * _A.itemsize
    dtype = _A.dtype
    shape = _A.shape

    shm = SharedMemory(create=True, size=nbytes)

    A = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

    A[:] = _A[:] #copy contents into shared A

    with Pool(initializer=init, initargs=(shm, shape, dtype)) as pool:
        pool.map(dodothing, enumerate(t_funcs)) #enumerate returns tuple[int,Any] each loop

    print(A.sum()/_A.sum()) #prove we multiplied all elements by 2

    shm.close()
    shm.unlink()

multiprocessing.Pool有时候在目标函数的有效参数方面有点滑稽,所以我倾向于通过池的初始化函数来共享LockQueueshared_memory等内容,该函数像Process一样接受参数。

相关问题