numpy 将多进程'Pool'与'RawArray'一起使用

ewm0tg9j  于 2023-10-19  发布在  其他
关注(0)|答案(1)|浏览(78)

每当我尝试使用共享内存与Python的“多处理”模块并行填充一个巨大的数组时,我会使用这样的方法:

import numpy as np
from multiprocessing import Process, RawArray

def tf(x, arr):
    arr = np.reshape( np.frombuffer( arr, dtype=np.float32 ), -1 ).reshape((10, 10, 10))
    arr[x] = np.random.random((10, 10))
    
mpa = RawArray('f', 1000)
ncpu = 4

procs = []
for i in range(10):
    procs.append(Process(target=tf, args=(i, mpa)))
    procs[-1].start()                                    
    if len(procs) == ncpu:
        procs[0].join()
        procs.pop(0)

for p in procs:
    p.join()
   
arr = np.reshape( np.frombuffer( mpa, dtype=np.uint32 ), -1).reshape((10, 10, 10))

以确保只有与CPU数量相同的进程处于活动状态。如果我尝试使用'Pool'和'apply_blog',数组不会因为某种原因而改变。所以我想知道是否可以使用“池”或任何其他预期的方式来管理活动进程的数量。
上面的代码是工作,但不是最有效的,因为我只有当我第一次添加的进程完成,以决定我是否应该添加另一个进程。

sz81bmfz

sz81bmfz1#

由于这个问题还没有得到回答,我将分享一个简单的Pool替代我设计的这类问题。

import numpy as np
from multiprocessing import Process, RawArray
from time import sleep

class Simple_Pool:
    def __init__(self, ncpu, target):
        self.ncpu = ncpu
        self.target = target
        self.params = []

    def add_process(self, parameters):
        self.params.append(parameters)

    def run_processes(self, wait_time=1e-3):
        procs = []
        for params in self.params:
            procs.append(Process(target=self.target, args=params))
            procs[-1].start()
            if len(procs) == self.ncpu:
                wait = True
                while wait:
                    for p, proc in enumerate(procs):
                        if not proc.is_alive():
                            proc.join()
                            procs.pop(p)
                            wait = False
                            break
                    # Adjust sleep time to your needs
                    sleep(wait_time)

        for p in procs:
            p.join()

def do_work(i, j, arr, pixel):
    print(f'Working on pixel ({i}, {j}), waiting {pixel:.3f} seconds')
    sleep(pixel)
    # get numpy array from the shared memory
    arr = np.frombuffer(arr, dtype=np.float32).reshape((M, N, 3))

    # do some work, here we just assign the pixel value times 0, 1 and 2
    arr[i, j] = np.arange(3) * pixel

M, N = 10, 5

# load image from file, here we just use a random array
image = np.random.random((M, N))

# create shared memory for the parameters
parameters = RawArray('f', M * N * 3)
ncpu = 4

P = Simple_Pool(ncpu, do_work)

for i in range(M):
    for j in range(N):
        P.add_process((i, j, parameters, image[i, j]))

# wait_time is the time between checking if any process has finished
P.run_processes(wait_time=1e-3)

# # get numpy array from the shared memory
parameters = np.frombuffer(parameters, dtype=np.float32).reshape((M, N, 3))

相关问题