python多处理池的卡住

v7pvogib  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(149)

我有多处理脚本与“池.imap_unordered”.
当脚本卡住时,我遇到了一个问题,但我检查了CPU使用率-没有发生任何事情(通过Ubuntu上的“top”命令)。
进入屏幕会话,我看到一个进程在执行时卡住了。
据我所知,我不使用有问题的fork()方法。
此外,当我在少量返回数据上运行脚本时,不会发生冻结(但它们发生在相对较小的数量上-表< 5 MB的CSV)。
有没有人能建议一下到底能帮上什么忙?信号量,锁或其他东西.我尝试了用更少的数据处理更多的过程-没有帮助。可能更容易更改为Python并行。

import multiprocessing

def my_func(df):

   # modify df here
   # df = df.head(1)
   return df

if __name__ == "__main__":
    df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    with multiprocessing.Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
        groups = (g for _, g in df.groupby("a"))
        print(df)
        print(groups)
        out = []
        for res in pool.imap_unordered(my_func, groups):
            out.append(res)
    final_df = pd.concat(out)

我也试过

import multiprocessing

def my_func(df):

   # modify df here
   # df = df.head(1)
   return df

if __name__ == "__main__":
    df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    with multiprocessing.get_context("spawn").Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
        groups = (g for _, g in df.groupby("a"))
        print(df)
        print(groups)
        out = []
        for res in pool.imap_unordered(my_func, groups):
            out.append(res)
    final_df = pd.concat(out)
vmdwslir

vmdwslir1#

你说你的代码不适合大数据。我没有太多的描述,但我将工作的假设,缺乏内存是你的问题的原因。如果是这样,你的dataframe有多大,你有多少内存来运行用户应用程序?除了获得更多内存之外,可能没有其他解决方案。但在此之前,我们可以问:在当前的处理中,可以做些什么来限制内存使用?这是否足以使程序现在运行?下面是对情况的一个相当长的分析,我提出了一些你可以做的修改,假设我的分析是正确的,这些修改可能会有所帮助。
使用方法imap_unordered的问题之一是,您几乎无法控制最初提交到池的任务队列的任务数量以及在任务完成后继续提交的任务数量。如果你有(1)一个大的dataframe,(2)大量的任务需要提交,(3)整个dataframe被传递给每个任务的worker函数,那么dataframe最初会在任务队列上复制很多次,而你几乎没有控制权,你可能很快就会耗尽内存。那么解决方案将是以某种方式限制在任何时间点可以在任务队列上等待处理的任务数量。
但是 that 并不完全是您的情况,因为您并没有将每个提交任务的整个dfdataframe传递给my_func,而是一个子集。即使groupby方法创建的所有子集都位于任务队列中,任务队列的存储需求也将近似等于整个dataframe的大小。然后,当这些组被处理并返回结果时,任务队列占用的存储将减少,因为out列表所需的存储将增加。总的存储需求可能不会随着任务的处理而改变那么多。也就是说,任务队列的大小将随着out大小的增加而减小。但是,当您创建final_df时,您同时需要存储dfoutfinal_df。因此,您目前需要能够在内存中保存dataframe的3个示例。
最简单的方法是在执行out列表的连接之前删除初始dfdataframe。这能解决你的记忆问题吗?我不知道,但现在我们只需要足够的内存来保存两个大型dataframe的副本。
我们可以做的另外一件事是控制任务排队的速率,从而限制任务队列所需的存储。下面的代码显示了如何通过将方法imap_unordered替换为指定回调的apply_async来实现这一点。我们使用一个初始化为Nmultiprocessing.BoundedSemaphore,其中N是我们希望在任何时候排队的最大任务数。在调用apply_async提交下一个任务之前,主进程必须先acquire信号量。它将能够这样做N次没有阻塞。当任务完成时,我们的回调函数在信号量上执行release,允许提交新任务。**但即使进行了此更改,您仍然需要足够的存储空间来容纳dataframe的两个副本,此更改可能不会有帮助。但是如果实际的my_func返回的值小于传递的df,那么这应该会有帮助。**如果代码现在可以工作,您可以简单地注解掉semaphore.acquire()semaphore.release()调用以删除此更改,并查看它是否仍然可以工作。

import multiprocessing
import pandas as pd

def my_func(df):

   # modify df here
   # df = df.head(1)
   return df

if __name__ == "__main__":
    # Use number of cores or possibly a smaller number if
    # we still have memory issues:
    POOL_SIZE = multiprocessing.cpu_count()

    # So that when a task completes there is always another task
    # on the input queue ready to run. If memory is still an issue,
    # then set SEMAPORE_SIZE = POOL_SIZE
    SEMAPHORE_SIZE = 2 * POOL_SIZE

    semaphore = multiprocessing.BoundedSemaphore(SEMAPHORE_SIZE)
    out = []

    def my_callback(result_df):
        out.append(result_df)
        # Allow another task to be submitted:
        semaphore.release()

    df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    pool = multiprocessing.Pool(processes=POOL_SIZE)
    for _, group in df.groupby("a"):
        semaphore.acquire()
        pool.apply_async(my_func, args=(group,), callback=my_callback)
    # Wait for all tasks to complete:
    pool.close()
    pool.join()

    del df # reclaim storage we do not need any more
    final_df = pd.concat(out)
    print(final_df)

图纸:

a  b  c
2  1  6  6
3  1  4  4
0  2  4  4
1  2  5  5
4  3  5  5
5  3  6  6

相关问题