该程序工作正常,直到to_sum
列表的大小达到约1150。之后,进程将挂起在第一个点task_queue = result_queue
。它们将成功填充结果队列并终止,但随后将挂起。如果数组的大小低于1150,则不会发生此问题。重新启动计算机有时允许程序在由于大小而挂起之前处理更大大小的数组,但它总是在1100-1300范围内。你知道什么会导致这个问题吗?
import multiprocessing
class CustomProcess(multiprocessing.Process):
def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.name = name
self.task_queue = task_queue
self.result_queue = result_queue
self.chunks = chunks
self.lock = lock
def run(self):
while True:
"""
Using a lock to avoid a race condition where 2 threads both get a number, then
both to get another, but it is None so they both put the number back resulting in a result queue
with bigger size. For example:
Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
achieving 500.
With race condition result is 501.
[1, 2, None, None]
Thread 1 gets 1.
Thread 2 gets 2.
Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
A lock on both gets removes the race condition.
"""
with self.lock:
if not self.task_queue.empty():
number_1 = self.task_queue.get()
self.task_queue.task_done()
if number_1 is None:
#Poison pill - terminate.
print(f"Terminated {self.name}")
return
else:
#Queue empty - terminate.
return
if not self.task_queue.empty():
number_2 = self.task_queue.get()
self.task_queue.task_done()
if number_2 is None:
#Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
#acquired.
self.result_queue.put(number_1)
print(f"Terminated {self.name}")
return
else:
self.result_queue.put(number_1)
#Queue empty, put the 1 number in result queue and terminate.
return
self.result_queue.put(number_1 + number_2)
def multiprocess_sum(array):
if len(array) == 1:
return array[0]
lock = multiprocessing.Lock()
task_queue = multiprocessing.JoinableQueue()
[task_queue.put(element) for element in to_sum]
task_queue_size = len(array)
while task_queue_size > 1:
print(task_queue.qsize(), task_queue_size)
result_queue = multiprocessing.JoinableQueue()
processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
for i in range(8)]
[task_queue.put(None) for process in processes]
[process.start() for process in processes]
#[process.join() for process in processes]
task_queue.join()
task_queue = result_queue
task_queue_size = task_queue_size // 2 + task_queue_size % 2
return result_queue.get()
if __name__ == "__main__":
to_sum = [i for i in range(1350)]
"""
If range is below 1200, the program will run and compute everything correctly.
If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
result_queue becomes the new task_queue.
Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
Queue pipe full???
"""
print(sum(to_sum))
for i in range(5):
print(multiprocess_sum(to_sum))
2条答案
按热度按时间vltsax251#
这对于注解来说太长了,所以:
首先,在多处理队列上调用
empty
和qsize
是不可靠的,不应该使用(阅读文档)。其次,您使用的锁可以防止任何真实的的多处理发生,因为在run
方法中完成的大量处理是串行执行的。第三,把1359个数字加起来需要1349个加法(怎么可能不是这样呢?)。所以,只需将1350个数字尽可能平均地划分为8个列表,然后让每个进程对其列表求和并返回结果。然后将返回的8个值相加即可得到最终结果。随着
to_sum
的大小增长,8个部分和的最后相加对总运行时间的贡献就越小。图纸:
我想知道
to_sum
列表必须有多大才能通过使用多处理来节省时间。更新
为了根据OP的尝试进行减少,我将使用以下代码。由于对队列中
qsize
的调用是不可靠的,因此我们使用一个可共享的整数Value
来跟踪队列中的项数,该整数必须在锁的控制下递增或递减。我们还创建了一次进程。请阅读评论。图纸:
更新2
但更简单的是使用多处理池,并向worker函数传递要减少的项对:
图纸:
h7appiyu2#
使用管理器解决了这个问题。
我必须同意用户Booboo的观点,这个特定的实现对于一个高性能的并行求和函数来说不是最好的(实际上可能是最差的)。如果你正在寻找一个并行求和函数,请阅读他的答案。如果您对程序在面对大队列时挂起的原因感兴趣,那么使用管理器应该可以解决您的问题。如果你有一个并行reduce函数的实现(更好地处理毒丸竞争条件),请分享,以便为那些专门寻找的人提供更好的实现。最好的问候,Tary