当Python中的任务队列大小超过1200个元素时,多处理reduce函数挂起

6g8kf2rb  于 2023-05-16  发布在  Python
关注(0)|答案(2)|浏览(85)

该程序工作正常,直到to_sum列表的大小达到约1150。之后,进程将挂起在第一个点task_queue = result_queue。它们将成功填充结果队列并终止,但随后将挂起。如果数组的大小低于1150,则不会发生此问题。重新启动计算机有时允许程序在由于大小而挂起之前处理更大大小的数组,但它总是在1100-1300范围内。你知道什么会导致这个问题吗?

import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)

def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()
    task_queue = multiprocessing.JoinableQueue()
    [task_queue.put(element) for element in to_sum]
    task_queue_size = len(array)

    while task_queue_size > 1:
        print(task_queue.qsize(), task_queue_size)
        result_queue = multiprocessing.JoinableQueue()
        processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                    for i in range(8)]
        [task_queue.put(None) for process in processes]
        [process.start() for process in processes]
        #[process.join() for process in processes]
        task_queue.join()
        task_queue = result_queue
        task_queue_size = task_queue_size // 2 + task_queue_size % 2
    return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    """
    If range is below 1200, the program will run and compute everything correctly.
    If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
    result_queue becomes the new task_queue. 
    Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
    Queue pipe full???
    """
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))
vltsax25

vltsax251#

这对于注解来说太长了,所以:
首先,在多处理队列上调用emptyqsize是不可靠的,不应该使用(阅读文档)。其次,您使用的锁可以防止任何真实的的多处理发生,因为在run方法中完成的大量处理是串行执行的。第三,把1359个数字加起来需要1349个加法(怎么可能不是这样呢?)。所以,只需将1350个数字尽可能平均地划分为8个列表,然后让每个进程对其列表求和并返回结果。然后将返回的8个值相加即可得到最终结果。
随着to_sum的大小增长,8个部分和的最后相加对总运行时间的贡献就越小。

import multiprocessing
from functools import reduce
from operator import add

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        self.result_queue.put(reduce(add, self.task_queue.get(), 0))

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

N_PROCESSES = 8

def multiprocess_sum(l):
    if len(l) == 1:
        return l[0]

    task_queue = multiprocessing.Queue()
    lists = split(l, N_PROCESSES)
    for l in lists:
        task_queue.put(l)

    result_queue = multiprocessing.Queue()

    processes = [
        CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
        for i in range(N_PROCESSES)
    ]
    for process in processes:
        process.start()

    the_sum = reduce(add, (result_queue.get() for _ in range(N_PROCESSES)), 0)

    for process in processes:
        process.join()

    return the_sum

if __name__ == "__main__":
    to_sum = list(range(1350))
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

图纸:

910575
910575
910575
910575
910575
910575

我想知道to_sum列表必须有多大才能通过使用多处理来节省时间。
更新

为了根据OP的尝试进行减少,我将使用以下代码。由于对队列中qsize的调用是不可靠的,因此我们使用一个可共享的整数Value来跟踪队列中的项数,该整数必须在锁的控制下递增或递减。我们还创建了一次进程。请阅读评论。

import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self,
                 name,
                 task_queue,
                 result_queue,
                 task_queue_size,
                 result_queue_size,
                 condition,
                 *args,
                 **kwargs
                 ):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.task_queue_size = task_queue_size
        self.result_queue_size = result_queue_size
        self.condition = condition

    def run(self):
        task_queue = self.task_queue
        result_queue = self.result_queue
        task_queue_size = self.task_queue_size
        result_queue_size = self.result_queue_size
        condition = self.condition

        task_queue_size_lock = task_queue_size.get_lock()

        while True:
            # When the task queue size goes down to zero, the main process
            # will move all the items from the result queue to the
            # task queue and then set the new task queue size.
            # We must not attempt to process the task queue while
            # this occurs, i.e. we must wait until the task queue size
            # is again non-zero:
            with condition:
                while task_queue_size.value == 0:
                    condition.wait()

            # No need to acquire lock for this test:
            if task_queue_size.value < 0:
                return # We are done

            # There is no gurantee we will find anything on the input queue:
            task_queue_size_lock.acquire()
            if task_queue_size.value == 0:
                task_queue_size_lock.release()
                continue

            number_1 = task_queue.get()
            task_queue_size.value -= 1
            if task_queue_size.value == 0:
                # put number on result_queue:
                task_queue_size_lock.release()
                result_queue.put(number_1)
                with result_queue_size.get_lock():
                    result_queue_size.value += 1
                task_queue.task_done()
            else:
                number_2 = task_queue.get()
                task_queue_size.value -= 1
                task_queue_size_lock.release()
                # No lock is held for the actual reduction operation:
                result_queue.put(number_1 + number_2)
                with result_queue_size.get_lock():
                    result_queue_size.value += 1
                # Since we have tasken off 2 elements from the task queue:
                task_queue.task_done()
                task_queue.task_done()

def multiprocess_sum(array):
    n = len(array)
    if n == 1:
        return array[0]

    task_queue = multiprocessing.JoinableQueue()
    # You should be iterating array, not to_sun and
    # using a comprehension for its side effect is not Pythonic:
    for element in array:
        task_queue.put(element)
    task_queue_size = multiprocessing.Value('i', n)

    result_queue = multiprocessing.Queue()
    result_queue_size = multiprocessing.Value('i', 0)

    condition = multiprocessing.Condition()

    processes = [
        CustomProcess(name=str(i),
                      task_queue=task_queue,
                      result_queue=result_queue,
                      task_queue_size=task_queue_size,
                      result_queue_size=result_queue_size,
                      condition=condition,
                      )
        for i in range(8)
    ]
    for process in processes:
        process.start()

    while True:
        print('n =', n)

        # Wait for task_queue to be emptied:
        task_queue.join()

        # Now we can be sure that the child processes are no longer retrieving from the task queue
        # and putting items on the result queue:

        n = result_queue_size.value
        if n == 1:
            print('n =', n)
            result = result_queue.get()
            # Tell child processes to terminate:
            with condition:
                task_queue_size.value = -1
                condition.notify_all()
            return result

        # Child processes get their input from task queue.
        # So we must get items from the result queue and move them to the task queue.
        # The task queue size is now 0 so this should pause the child processes.
        for _ in range(n):
            task_queue.put(result_queue.get())
        result_queue_size.value = 0
        # Allow children to run
        with condition:
            # The new n value will be half of the previous n value (more or less)
            task_queue_size.value = n
            condition.notify_all()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    print(sum(to_sum))

    for i in range(5):
        print(multiprocess_sum(to_sum))

图纸:

910575
task queue size: 1350
task queue size: 675
task queue size: 338
task queue size: 169
task queue size: 85
task queue size: 43
task queue size: 22
task queue size: 11
task queue size: 6
task queue size: 3
task queue size: 2
task queue size: 1
910575
task queue size: 1350
task queue size: 675
task queue size: 338
task queue size: 169
task queue size: 85
task queue size: 43
task queue size: 22
task queue size: 11
task queue size: 6
task queue size: 3
task queue size: 2
task queue size: 1
910575
task queue size: 1350
task queue size: 675
etc.

更新2

但更简单的是使用多处理池,并向worker函数传递要减少的项对:

import multiprocessing
from itertools import chain

def custom_process(pair):
    return pair[0] + pair[1] if len(pair) == 2 else pair[0]

def multiprocess_sum(array):
    n = len(array)
    if n == 1:
        return array[0]

    results = array
    pool = multiprocessing.Pool(8)
    while True:
        print('n =', n)
        if n == 1:
            break

        # Create pairs:
        pairs = zip(results[:n//2], results[n//2:])
        # Did we start with an odd number of elements?
        if n % 2 == 1:
            pairs = chain(pairs, ((results[-1],),))
        # specify a chunksize for improved performance:
        results = list(pool.imap_unordered(custom_process, pairs))
        n = len(results)
    result = results[0]
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    print(sum(to_sum))

    for i in range(5):
        print(multiprocess_sum(to_sum))

图纸:

910575
n = 1350
n = 675
n = 338
n = 169
n = 85
n = 43
n = 22
n = 11
n = 6
n = 3
n = 2
n = 1
910575
n = 1350
n = 675
h7appiyu

h7appiyu2#

使用管理器解决了这个问题。

import multiprocessing
"""
Using a manager avoids the problem where the program will hang if the input is too big.
"""

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)

def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()

    with multiprocessing.Manager() as manager:

        task_queue = manager.Queue()
        [task_queue.put(element) for element in to_sum]
        task_queue_size = len(array)

        while task_queue_size > 1:
            result_queue = manager.Queue()

            processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                        for i in range(8)]
            [task_queue.put(None) for process in processes]
            [process.start() for process in processes]
            #[process.join() for process in processes]
            task_queue.join()
            task_queue = result_queue
            task_queue_size = task_queue_size // 2 + task_queue_size % 2
        return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(2000)]

    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

我必须同意用户Booboo的观点,这个特定的实现对于一个高性能的并行求和函数来说不是最好的(实际上可能是最差的)。如果你正在寻找一个并行求和函数,请阅读他的答案。如果您对程序在面对大队列时挂起的原因感兴趣,那么使用管理器应该可以解决您的问题。如果你有一个并行reduce函数的实现(更好地处理毒丸竞争条件),请分享,以便为那些专门寻找的人提供更好的实现。最好的问候,Tary

相关问题