我想创建一个装饰器,使函数接受单个参数,以并行处理可迭代参数。
下面是示例代码:
import functools
import time
from multiprocessing import Pool
def parallel(func):
def wrapper(iterable):
with Pool() as pool:
result = pool.map(func, iterable)
return result
return wrapper
@parallel
def test(i):
time.sleep(1)
print(f"{i}: {i * i}")
def main():
test(range(10))
if __name__ == "__main__":
main()
但我有
Traceback (most recent call last):
File "/home/user/projects/amdb/s2.py", line 29, in <module>
main()
File "/home/user/projects/amdb/s2.py", line 25, in main
test(range(10))
File "/home/user/projects/amdb/s2.py", line 10, in wrapper
result = pool.map(func, iterable)
File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get
raise self._value
File "/usr/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks
put(task)
File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function test at 0x7fef63309120>: it's not the same object as __main__.test
我知道我可以通过不使用装饰器来解决这个问题,比如test_multi = parallel(test)
,但问题是如何在装饰器中使用它。
1条答案
按热度按时间4ioopgfo1#
这是因为multiprocessing.Pool需要能够序列化函数和参数以便在进程之间传递,而decorator @parallel使得无法正确序列化测试函数。可以使用
pathos
库,它是multiprocessing
的扩展,可以序列化更多类型,包括函数。请在使用前安装它。pip install pathos
验证码: