我目前正在Python上开发一个类似Mathematica的库,名为Mathics,实际上我想让我的程序与多处理模块并行化。
但是我有一个问题,要使用Mathics进行数学评估,我必须打开Mathics会话,通过创建类MathicsSession()的示例。创建这个示例需要很多时间(大约5秒),而且这个示例不能被多处理模块pickle,这意味着每次调用池Map中的函数时,我都必须示例化我的会话,这使得我的多处理代码比我以前的代码更长。这是一个问题,因为我的项目的主要目标是高度并行化…
我不确定我是否可以理解,但我想要的是只示例化我的会话一次,也许每个CPU一次(我不知道是否可能),下面是我的意思的代码示例:
from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession
def evaluation(session):
return session.evaluate("5+5")
if __name__ == '__main__':
sessions = []
for i in range(50):
sessions.append(MathicsSession())
with Pool(cpu_count()) as pool:
result = pool.map(evaluation, sessions)
以下是显示我的会话无法被pickle的错误:Traceback (most recent call last): File "/home/thales_usradm/PycharmProjects/Test/test2.py", line 16, in <module> result = pool.map(evaluation, sessions) File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get raise self._value File "/usr/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks put(task) File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'Builtin.contribute.<locals>.check_options'
也许我可以使示例序列化?
2条答案
按热度按时间nxagd54h1#
我不确定为什么要为每次调用
evaluation
创建一个新的会话,正如您所说的,即使您能够pickle会话,创建会话也是一个相当昂贵的操作。我认为您可能希望为每个池进程创建一个会话,该会话将可重用于在该池进程中执行的 *worker函数 *evaluation
的多次调用。因此,您可以在创建池时指定一个 pool initializer。这是一个在池初始化时在每个池进程中执行一次的函数,它通常会创建一个或多个worker函数可以访问的全局变量。
也许这是一个更现实的例子。我们将计算50个不同的表达式,而不是重新计算50次相同的值:
ru9i0ody2#
我相信你可以对
Pool()
使用initializer
参数,以便每个进程运行一次函数(我认为默认情况下每个CPU运行一次)。本例使用
MyClass
作为不可pickleable对象(不能pickle定义在模块作用域下的类)。对于
initi
,可能有比设置一个全局变量更好的方法,但这显示了一般的想法: