python 跨进程共享代理对象的示例会导致pickle错误

j2cgzkjk  于 2023-02-07  发布在  Python
关注(0)|答案(2)|浏览(138)

bounty已结束。此问题的答案可获得+150声望奖励。奖励宽限期将在12小时后结束。lezebulon正在查找规范答案:我试着理解我所做的事情是否应该按照python spec/doc工作(然后是一个bug),或者不是

我尝试在python中实现一个简单的共享对象系统,我做了以下工作:

import os
from multiprocessing.managers import SyncManager

    if __name__ == '__main__':
        manager = SyncManager(authkey=b'test')
        manager.start()
        address = manager.address
        d = manager.dict()
        pickled_dict = d.__reduce__()
        pickled_dict[1][-1]["authkey"] = b"test"
        print(pickled_dict)
        for i in range(1000):
            d[i] = i
    
        child_id = os.fork()
    
        if child_id != 0:
            # in parent, do work on the proxy object forever
            i = 0
            while True:
                d[i%1000] = i%3434
                i += 1
        else:
            # in children
    
            # connect to manager process
            child_manager = SyncManager(address=address, authkey=b'test')
            child_manager.connect()
    
            # rebuild the dictionary proxy
            proxy_obj = pickled_dict[0](*pickled_dict[1])
    
            # read on the proxy object forever
            while True:
                print(list(proxy_obj.values())[:10])

但是在Python 3.9中,这会因为各种pickle错误而失败,比如_pickle.UnpicklingError: invalid load key, '\x0a'.
我在这里做错了什么吗?AFAIK应该可以并发地(几个进程)从Manager对象读取/写入Manager对象(仅供参考,我也在Python上创建了一个问题:https://github.com/python/cpython/issues/101320,但还没有答案)

b4lqfgs4

b4lqfgs41#

下面的代码你可能很熟悉,所以我告诉你一些你可能已经知道的东西。但是除了你可能"浪费"了5分钟阅读这段代码之外,这并没有什么坏处。即使这样,你也可能会发现一些有价值的东西。
我真的认为你没有必要使用底层调用__reduce__fork,所以这个答案并没有试图让这些调用为你工作,但是,它确实解决了我所理解的你的主要目标,即......"尝试在python中在多个进程之间实现一个简单的共享对象系统",通过展示如何实现一个多平台解决方案。
下面的代码以一种平台无关的方式创建了一个新的进程,并且依赖于Python知道如何序列化代理对象。我还修改了你的代码,使它不会无限期地运行:

from multiprocessing.managers import SyncManager
from multiprocessing import Process

def child_process(address, proxy_obj):
    child_manager = SyncManager(address=address, authkey=b'test')
    child_manager.connect()

    print(list(proxy_obj.values())[:10])

if __name__ == '__main__':
    with SyncManager(authkey=b'test') as manager:
        address = manager.address
        # Instead of making 1000 slow calls on a proxy,
        # do this instead:
        d = manager.dict({i: i for i in range(1000)})
        p = Process(target=child_process, args=(address, d))
        p.start()
        p.join()

图纸:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    • 注:**

你说,"...应该可以同时进行读/写(多个进程)from/to a Manager对象"--的确如此!对代理对象的每次调用都会导致调用及其参数被序列化并发送到Manager的进程,以便由线程反序列化和执行。因此,只要调用是线程-安全应该没有问题,这是基础实现是标准字典的托管字典的情况。但请考虑客户端进程执行代码的情况,如下所示:

proxy_obj[i] += 1

如果多个进程可能尝试递增同一个值,则需要在multiprocessing.Lock示例的控制下完成,因为递增字典值需要对字典进行两次连续调用,即一个用于检索当前值,另一个用于使用递增后的值更新字典。因此,每当您的更新不是原子更新时,您需要将更新视为 *临界区 *,其执行必须被序列化。例如,

from multiprocessing.managers import SyncManager
from multiprocessing import Process, Lock

def child_process(address, proxy_obj, lock):
    child_manager = SyncManager(address=address, authkey=b'test')
    child_manager.connect()

    for i in range(10):
        with lock:
            proxy_obj[i] += 1

if __name__ == '__main__':
    with SyncManager(authkey=b'test') as manager:
        address = manager.address
        d = manager.dict({i: i for i in range(10)})
        lock = Lock()
        p = Process(target=child_process, args=(address, d, lock))
        p.start()
        for i in range(10):
            with lock:
                d[i] += 1
        p.join()
        print(list(d.values()))

图纸:

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
q3aa0525

q3aa05252#

正如注解中提到的,问题在于父进程和子进程都使用相同的底层OS套接字缓冲区进行通信,因为这是您正在使用的原始C os.fork的默认行为(它只由操作系统定义,而不是Python),并且不应该在标准python代码中使用。你应该使用multiprocessing模块,因为它做额外的“清理”来确保这些错误不会发生。
在这种情况下,“清理”只是简单地清除multiprocessing.managers.BaseProxy._address_to_local字典,该字典在父字典中具有套接字,它被保存为python全局状态的一部分,该全局状态与打开的套接字连接沿着被继承。

import os
from multiprocessing.managers import SyncManager
import multiprocessing.managers

if __name__ == '__main__':
    manager = SyncManager(authkey=b'test')
    manager.start()
    address = manager.address
    d = manager.dict()
    pickled_dict = d.__reduce__()
    pickled_dict[1][-1]["authkey"] = b"test"
    print(pickled_dict)
    for i in range(1000):
        d[i] = i

    child_id = os.fork()

    if child_id != 0:
        # in parent, do work on the proxy object forever
        i = 0
        while True:
            d[i % 1000] = i % 3434
            i += 1
    else:
        # in children
        multiprocessing.managers._address_to_local.clear()

        # rebuild the dictionary proxy
        proxy_obj = pickled_dict[0](*pickled_dict[1])

        # read on the proxy object forever
        while True:
            print(list(proxy_obj.values())[:10])

注意,这将引发另一个错误,当子进程试图终止时,因为它将试图加入它也继承的manager,因为它的引用没有清理,但这不会使您的应用程序崩溃,如果有一个内置模块可以为您的...哦,对,有。
python os模块实现被定义为操作系统相关,仅为操作系统功能提供 Package 器,不需要其他模块的安全性和可移植性。

相关问题