python-3.x 为什么并行代码比顺序代码慢

kiayqfof  于 2023-03-04  发布在  Python
关注(0)|答案(2)|浏览(157)

我正在尝试实现一个在线递归并行算法,该算法具有高度的并行性。我的问题是我的python实现无法按照我的要求工作。我有两个二维矩阵,每次在时间步长t观察到新的观察结果时,我都希望递归更新其中的每列。我的并行代码如下所示

def apply_async(t):
    worker =  mp.Pool(processes = 4)
    for i in range(4):
        X[:,i,np.newaxis], b[:,i,np.newaxis] =  worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()

    worker.close()
    worker.join()      



for t in range(p,T):
    count = 0 
    for l in range(p):
        for k in range(4):
            gn[count]=train[t-l-1,k]
            count+=1
    G = G*v +  gn @ gn.T
    Gt = (1/(t-p+1))*G

    if __name__ == '__main__':
        apply_async(t)

这两个矩阵是X和B。我想直接在master的内存中进行替换,因为每个进程只递归更新矩阵的一个特定列。
为什么这种实现方式比顺序方式慢?
有没有什么方法可以在每个时间步重新开始这个过程,而不是杀死它们并重新创建它们?这可能是它变慢的原因吗?

2hh7jdfx

2hh7jdfx1#

原因是,你的程序实际上是顺序的,这是一个示例代码片段,从并行性的Angular 来看与你的相同:

from multiprocessing import Pool
from time import sleep

def gwork( qq):
    print (qq)
    sleep(1)
    return 42

p = Pool(processes=4)

for q in range(1, 10):
    p.apply_async(gwork, args=(q,)).get()
p.close()
p.join()

运行这个程序,你会注意到数字1-9每秒钟出现一次。这是为什么呢?原因是你的.get()。这意味着对apply_async的每个调用实际上都会阻塞在get()中,直到得到结果。它会提交一个任务,等待一秒钟模拟处理延迟,然后返回结果。之后另一个任务被提交到你的池中。这意味着根本没有并行执行在进行。
尝试将池管理部分替换为以下内容:

results = []
for q in range(1, 10):
    res = p.apply_async(gwork, args=(q,))
    results.append(res)
p.close()
p.join()
for r in results:
    print (r.get())

现在你可以看到并行性在起作用,因为你的四个任务现在被同时处理了,你的循环不会阻塞get,因为get被移出了循环,结果只有在它们准备好的时候才被接收。
注意:如果你给工作者的参数或者从它们返回的值是大型数据结构,你会损失一些性能。在实践中,Python将它们实现为队列,并且通过队列传输大量数据相对来说比在子进程被分叉时获得内存中的数据结构副本要慢。

ljsrvy3e

ljsrvy3e2#

我在实现Hannu的代码时一直遇到问题:

results = []
for q in range(1, 10):
    res = p.apply_async(gwork, args=(q,))
    results.append(res)
p.close()
p.join()
for r in results:
    print (r.get())

问题是当循环遇到第一个带有异常的r.get()时,整个程序会退出,因为它没有得到正确的处理,我见过这个方法以几乎相同的方式发布过很多次,但总是导致同样的问题。
我最后将r.get()封装在一个TRY/EXCEPT块中,这样程序就可以处理列表中的所有异常,并按照设计继续运行。

from multiprocessing.pool import Pool
import traceback
    results = []
    pool = Pool(32)
    process_schedule_data = pool.apply_async(TSMDataProcessor().process_schedule_data, args=("Schedule",))
    # a bunch more calls like the one above but to different methods of the same class
    pool.close()
    pool.join()

    for r in results:
        try:
            r.get()
        except BaseException:
            logger.error(f"data processor exception: {traceback.format_exc()}")

相关问题