Python脚本在使用multiprocessing.Queue()时突然卡住

x3naxklr  于 2023-09-29  发布在  Python
关注(0)|答案(1)|浏览(219)

我试图在Python中使用pythonocc-core创建一个复杂的CAD几何图形。为了加快计算速度,我使用内置的并行计算在一个复杂的固体切割孔。为了克服内存限制,我将切割过程分批进行,并且为了避免以前遇到的内存泄漏,我使用multiprocessing.Process()在单独的进程中进行切割操作。最简单的传递结果的方法是,首先将部分几何体写入步骤文件。但是,我想尝试直接返回剪切的结果,而不需要编写文件。
我的代码看起来像这样:

QQ=multiprocessing.Manager().Queue() # I have also tried with multiprocessing.Queue() but the result is the same

for block in list_blocks:
    p1=multiprocessing.Process(target=cutting_function, args=(result, block, QQ, #others)
    p1.start()

    result=QQ.get()
    p1.join()

函数是这样的:

def cutting_function(result, block, QQ, #others):
    #various calculations
    QQ.put(result)

程序执行了一定数量的(少量)迭代,然后突然似乎卡住了,不进行下一次迭代(与前一次迭代相比,我等待了大量的时间,但没有进步)
我在Anaconda环境中使用Python 3.11.5,在Windows 11上使用WSL 2 Ubuntu
我试过multiprocessing.Queue()和multiprocessing.Manager().Queue()我试过在for循环之外创建一次QQ,但也试过在每个循环中重新创建它我也试过在QQ.get()之前加入p1,但没有区别。

fnvucqvd

fnvucqvd1#

下面是一个很长的答案。简短版本:我很确定您对cutting_function的一个调用在将任何内容放入队列之前就结束了。所以主进程永远等待队列中的某个东西。如果您知道如何纠正cutting_function中的“结束而不将某些内容放入队列”错误,那么就去做吧,而无需阅读其余部分。否则:……
所以,如果我理解正确的话,你并不真的需要多处理。您只想隔离子进程中的每个调用,以便由该进程创建的任何混乱(内存泄漏)都随之消失。
不用说,我想,找到内存泄漏的原因并解决它会更好。
那你现在做的就是

import multiprocessing

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    QQ.put(1000+arg)

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    r=QQ.get()
    p1.join()
    print(i, r)

它运行起来没有任何问题,显示了100行,从0 100099 1099。我想是的。而且没有任何类型的并行性(即使我有一个复杂的计算,而不是1000+arg,它也会是一个接一个的)。
请注意,我跳过了经理。如果只有一个进程,就不需要管理器。
问题是,使用队列获取返回值的方式意味着每个进程只在其上放置一个值。
看另一个例子

import multiprocessing

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    if arg==50: return
    QQ.put(1000+arg)

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    r=QQ.get()
    p1.join()
    print(i, r)

这一次它工作得很好,很好…直到第50轮。在这一点上,它只是冻结。应该是的第50个进程运行并退出,没有将任何内容放入队列。然后你试图从队列中接收一些东西。什么都没有。所以它会阻塞,等待有东西出现。
注意,这个例子的一个变体可以是这个函数

def cutting_function(arg, QQ):
    QQ.put(math.log(49-arg))

我的观点是:为什么你可能会得到不值的原因,它是因为一个进程崩溃之前,它可以返回任何东西。如果没有在单独的进程中运行,该崩溃将导致整个代码崩溃。但在这里,它只是一个崩溃的过程。它在屏幕上显示错误消息,但主进程仍在继续。仍然在等待队列中出现的东西。
你的用法可能会让人惊讶。人们可能会想,在流程完成后,它怎么会期望队列中出现一些东西。但那是因为你对队列的使用并不是通常的那种。通常情况下,当你并行运行10个进程时,应该使用它,并且在主进程中,等待结果,无论顺序如何(不一定是你开始进程的顺序)。
对于您的“单处理”用法,使用非常顺序的:我启动一个进程,进程结束,我得到唯一的结果,Value可能更适合(使用默认值,以便您可以知道,当进程完成时,没有返回值)。
另一种解决方案是简单地测试队列是否为空

import multiprocessing
import math

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    QQ.put(math.log((49-arg)**2))

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    if QQ.empty():
        print("no result")
    else:
        r=QQ.get()
        print(i, r)
    p1.join()

第50行显示错误(在子进程中),然后在主进程中显示“无结果”。但之后它继续。
然而,multiprocessing文档指出,在QQ.put()和该结果可用之间可能存在“无穷小时间”。所以,有一个轻微的风险(即使有数百万次尝试,我也看不到),队列在很短的时间内是空的。如果是QQ.get(),那就不重要了。这意味着QQ.get()等待的时间是无限小的。对于我的后一种解决方案,这种轻微的风险是我们错过一次结果的风险。然后每个QQ.get()得到前一个结果(第100个结果丢失)。
因此,另一种解决方案是确保每个子流程都推送一些内容

import multiprocessing
import math

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    try:
        QQ.put(math.log((49-arg)**2))
        return
    except:
        QQ.put('error')
        return
    QQ.put('no result')

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    r=QQ.get()
    print(i, r)
    p1.join()

注意QQ.put后面的return。这是为了确保我们不会把两件事放在队列中。在这样一个简单的代码,没有风险。但在更复杂的代码中,这意味着如果我们到达了末尾(函数的最后一行),这要么是因为我们崩溃了,要么是因为我们忘记在队列中放入一些东西。
那些error/no result是哨兵。确保队列中有东西,即使什么都没有。
还请注意,当我说“崩溃”时,我指的是Python错误。一个引发异常的异常,可以被try/except捕获。如果它是一个真实的的崩溃(例如segfault)。只有当你在python解释器中发现一个bug时,或者如果你玩了一些火,比如C扩展代码,或者ctypes),那么我的except将什么也不会捕获,所以队列中什么也没有。尽管如此,主进程仍然会很好地活着,因为它们是不同的进程。

import multiprocessing
import math
import ctypes

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    try:
        if arg==30:
            i=ctypes.c_int(1)
            p=ctypes.pointer(i)
            x=p[-1000000000] # Should segfault
        QQ.put(math.log((49-arg)**2))
        return
    except:
        pass
    QQ.put('no result')

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    r=QQ.get()
    print(i, r)
    p1.join()

此示例在第30行冻结。因为第30道工序segfault。因此,它不仅没有QQ.putlog(49-30),而且也没有QQ.put哨兵:整个Python进程崩溃了。但是,主进程仍然在那里,等待QQ.get()在队列中找到一些东西。
所以,如果有发生这种情况的风险,你可以混合吊带和腰带:

import multiprocessing
import math
import ctypes

QQ=multiprocessing.Manager().Queue()

def cutting_function(arg, QQ):
    try:
        if arg==30:
            i=ctypes.c_int(1)
            p=ctypes.pointer(i)
            x=p[-1000000000] # Should segfault
        QQ.put(math.log((49-arg)**2))
        return
    except:
        pass
    QQ.put('no result')

for i in range(100):
    p1=multiprocessing.Process(target=cutting_function, args=(i, QQ))
    p1.start()
    if QQ.empty():
         time.sleep(1)
    if QQ.empty():
         print("subprocess crashed")
    else:
        r=QQ.get()
        print(i, r)
    p1.join()

所以,如果进程正常工作,那么,如果队列有一个结果。
如果进程忘记返回队列中的某些内容,或者引发异常,那么队列中会有一个哨兵(“no result”或“error”),所以队列中仍然有一些内容。

如果进程完全崩溃(例如,由于segfault),那么QQ.empty会看到它并处理这种情况。最后,如果出现了“无限小的时间”,并且QQ.empty在进程没有崩溃的情况下,我们给予消息第二次到达的机会(1秒远远超过了任何可以被限定为“无限小的时间”的时间)。
所以,在这里,我会说,你可以很肯定,你永远不会陷入等待一个空队列。

相关问题