python-3.x 使用并发期货而不会耗尽RAM

9w11ddsr  于 2023-05-02  发布在  Python
关注(0)|答案(4)|浏览(114)

我正在做一些文件解析,这是一个CPU绑定的任务。无论我在进程中抛出多少文件,它使用的RAM都不超过50 MB。该任务是可并行的,我已经将其设置为使用下面的并发期货来解析每个文件作为一个单独的进程:

from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = {}

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)

问题是,当我使用futures运行这个程序时,RAM的使用量激增,不久我就用完了,Python也崩溃了。这可能在很大程度上是因为parse_function的结果大小为几MB。一旦结果通过post_processing,应用程序就不再需要它们了。正如您所看到的,我尝试使用del jobs[job]来清除jobs中的项,但这并没有产生任何影响,内存使用量保持不变,并且似乎以相同的速度增加。
我还确认了这不是因为它只使用一个进程来等待post_process函数,再加上一个time.sleep(1)
futures文档中没有任何关于内存管理的内容,虽然简短的搜索表明它以前在futures的实际应用程序中出现过(Clear memory in python loophttp://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures)-答案并没有转化为我的用例(它们都与超时等有关)。
那么,如何在不耗尽RAM的情况下使用Concurrent futures?(Python 3.5)

tjvv9vkg

tjvv9vkg1#

我会开枪(可能是一个错误的猜测。..)
您可能需要一点一点地提交您的工作,因为在每次提交时,您都要复制parser_variables,这可能最终会占用您的RAM。
下面是在有趣的部分使用“〈----”的工作代码

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<----
    files_iter = iter(files_list) #<------

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   <---

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <-----
4smxwvx5

4smxwvx52#

尝试像这样将del添加到代码中:

for job in futures.as_completed(jobs):
    del jobs[job]  # or `val = jobs.pop(job)`
    # del job  # or `job._result = None`
piah890a

piah890a3#

查看concurrent.futures.as_completed()函数,我了解到它足以确保不再有任何对未来的引用。如果在得到结果后立即分发此引用,将最大限度地减少内存使用。
我使用生成器表达式来存储我的Future示例,因为我关心的所有内容都已经在future的结果中返回了(基本上就是分派工作的状态)。其他实现使用dict,例如在您的例子中,因为您不返回输入文件名作为线程工作程序结果的一部分。
使用生成器表达式意味着一旦产生结果,就不再有任何对Future的引用。在内部,as_completed()已经在向您生成完整的Future后删除了自己的引用。

futures = (executor.submit(thread_worker, work) for work in workload)

for future in concurrent.futures.as_completed(futures):
    output = future.result()
    ...  # on next loop iteration, garbage will be collected for the result data, too

编辑:从使用set和删除条目简化为简单地使用生成器表达式。

ar7v8xwq

ar7v8xwq4#

我也有同样的问题
在我的情况下,我需要启动数百万个线程。对于python2,我会使用dict自己编写一个线程池。但是在python3中,当我动态删除已完成的线程时,我遇到了以下错误:

RuntimeError: dictionary changed size during iteration

所以我必须使用concurrent。futures,起初我这样编码:

from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    with ThreadPoolExecutor(max_workers=50) as pool:
        for r in all_resouces:
            pool.submit(handle_resource, *args)

但很快内存就耗尽了,因为只有在所有线程完成后才释放内存。我需要删除完成线程之前,许多线程开始。所以我在这里读了文档: www.example.com
找到那个执行者。shutdown(wait=True)可能是我需要的。这是我的最终解决方案:

from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    i = 0
    while i < len(all_resouces):
        with ThreadPoolExecutor(max_workers=50) as pool:
            for r in all_resouces[i:i+1000]:
                pool.submit(handle_resource, *args)
            i += 1000

如果使用with语句,则可以避免显式调用此方法,因为这将关闭Executor(就像Executor一样等待。调用shutdown(),wait设置为True)。

更新:

找到了一个更好的解决方案:

futures: Set[Future] = set()
with ThreadPoolExecutor(max_workers) as thread_pool:
    for resouce in list/set/iterator/generator:
        if len(futures) >= 1000:
            """
            release a completed future when more than 1000 futures created, then submit(create) a new one.
            this will prevent memory exhausted when millions of futures needed
            """
            completed_future = next(as_completed(futures))
            futures.remove(completed_future)
        future = thread_pool.submit(resouce_handler, args)
        futures.add(future)

相关问题