python 如何在macOS上获取'多重处理.queues.Queue.qsize()`?

shyt4zoc  于 2022-12-10  发布在  Python
关注(0)|答案(1)|浏览(121)

这是一个旧问题,表明workaround不工作。
下面是一个完整的例子,显示了建议的方法是如何失败的。取消注解第31行(# self.size.increment(-1) # uncomment this for error)的错误。

import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue

class SharedCounter(object):
    def __init__(self, n=0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n=1):
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        return self.count.value

class MyQueue(Queue):
    def __init__(self, *args, **kwargs):
        super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(MyQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        # self.size.increment(-1)  # uncomment this for error
        return super(MyQueue, self).get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

    def clear(self):
        while not self.empty():
            self.get()

def worker(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'[{os.getpid()}]: got {item}')
        time.sleep(1)

if __name__ == '__main__':
    num_processes = 4
    q = MyQueue()
    pool = multiprocessing.Pool(num_processes, worker, (q,))

    for i in range(10):
        q.put("hello")
        q.put("world")

    for i in range(num_processes):
        q.put(None)

    q.close()
    q.join_thread()
    pool.close()
    pool.join()

由于某种原因,新定义的MyQueue忘记了size属性。

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.11/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
0x6upsns

0x6upsns1#

您没有覆盖__setstate____getstate__以包含您的变量,pickle使用它们来控制序列化处理有状态对象...因此您应该覆盖它们以将您的变量添加到正在序列化的对象中。

import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue

class SharedCounter(object):
    def __init__(self, n=0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n=1):
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        return self.count.value

class MyQueue(Queue):
    def __init__(self, *args, **kwargs):
        super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
        self.size = SharedCounter(0)

    def __getstate__(self):
        return (super(MyQueue, self).__getstate__(),self.size)
    def __setstate__(self, state):
        super(MyQueue, self).__setstate__(state[0])
        self.size = state[1]
    
    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(MyQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)  # uncomment this for error
        return super(MyQueue, self).get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

    def clear(self):
        while not self.empty():
            self.get()

def worker(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'[{os.getpid()}]: got {item}')
        time.sleep(1)

if __name__ == '__main__':
    num_processes = 4
    q = MyQueue()
    pool = multiprocessing.Pool(num_processes, initializer=worker, initargs=(q,))

    for i in range(10):
        q.put("hello")
        q.put("world")

    for i in range(num_processes):
        q.put(None)

    q.close()
    q.join_thread()
    pool.close()
    pool.join()

注意,在python 3中,我们不需要使用super(MyQueue, self),因为super()就足够了,并且会使将来重命名类变得更容易,并带来其他可移植性和重构的好处,所以考虑用super()替换任何super(x,y)

相关问题