如何让我的类示例可序列化,以便在Python中进行多处理?

rekjcdws  于 2023-05-30  发布在  Python
关注(0)|答案(2)|浏览(93)

我目前正在Python上开发一个类似Mathematica的库,名为Mathics,实际上我想让我的程序与多处理模块并行化。
但是我有一个问题,要使用Mathics进行数学评估,我必须打开Mathics会话,通过创建类MathicsSession()的示例。创建这个示例需要很多时间(大约5秒),而且这个示例不能被多处理模块pickle,这意味着每次调用池Map中的函数时,我都必须示例化我的会话,这使得我的多处理代码比我以前的代码更长。这是一个问题,因为我的项目的主要目标是高度并行化…
我不确定我是否可以理解,但我想要的是只示例化我的会话一次,也许每个CPU一次(我不知道是否可能),下面是我的意思的代码示例:

from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession

def evaluation(session):
    return session.evaluate("5+5")

if __name__ == '__main__':

    sessions = []
    for i in range(50):
        sessions.append(MathicsSession())

    with Pool(cpu_count()) as pool:
        result = pool.map(evaluation, sessions)

以下是显示我的会话无法被pickle的错误:
Traceback (most recent call last): File "/home/thales_usradm/PycharmProjects/Test/test2.py", line 16, in <module> result = pool.map(evaluation, sessions) File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get raise self._value File "/usr/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks put(task) File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'Builtin.contribute.<locals>.check_options'
也许我可以使示例序列化?

nxagd54h

nxagd54h1#

我不确定为什么要为每次调用evaluation创建一个新的会话,正如您所说的,即使您能够pickle会话,创建会话也是一个相当昂贵的操作。我认为您可能希望为每个池进程创建一个会话,该会话将可重用于在该池进程中执行的 *worker函数 * evaluation的多次调用。
因此,您可以在创建池时指定一个 pool initializer。这是一个在池初始化时在每个池进程中执行一次的函数,它通常会创建一个或多个worker函数可以访问的全局变量。
也许这是一个更现实的例子。我们将计算50个不同的表达式,而不是重新计算50次相同的值:

from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession

def init_pool_processes():
    """
    Create a session for each pool process.
    """
    global session
    
    session = MathicsSession()
    
    
def evaluation(n):
    # session is now a global variable
    return session.evaluate(f"5+{n}")

if __name__ == '__main__':
    with Pool(cpu_count(), initializer=init_pool_processes) as pool:
        result = pool.map(evaluation, range(50))
ru9i0ody

ru9i0ody2#

我相信你可以对Pool()使用initializer参数,以便每个进程运行一次函数(我认为默认情况下每个CPU运行一次)。
本例使用MyClass作为不可pickleable对象(不能pickle定义在模块作用域下的类)。
对于initi,可能有比设置一个全局变量更好的方法,但这显示了一般的想法:

from multiprocessing import Pool
import time

sub_class = None

def get_sub_class():
    class MyClass():
        pass

    time.sleep(1)
    return MyClass()

def f(x):
    print(id(sub_class), x)

def initi():
    global sub_class
    sub_class = get_sub_class()

if __name__ == '__main__':
    mylist = list(range(5))
    
    with Pool(processes=4, initializer=initi) as pool: 
        pool.map(f, mylist)

相关问题