我正在做一些文件解析,这是一个CPU绑定的任务。无论我在进程中抛出多少文件,它使用的RAM都不超过50 MB。该任务是可并行的,我已经将其设置为使用下面的并发期货来解析每个文件作为一个单独的进程:
from concurrent import futures
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
for this_file in files_list:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
问题是,当我使用futures运行这个程序时,RAM的使用量激增,不久我就用完了,Python也崩溃了。这可能在很大程度上是因为parse_function的结果大小为几MB。一旦结果通过post_processing
,应用程序就不再需要它们了。正如您所看到的,我尝试使用del jobs[job]
来清除jobs
中的项,但这并没有产生任何影响,内存使用量保持不变,并且似乎以相同的速度增加。
我还确认了这不是因为它只使用一个进程来等待post_process
函数,再加上一个time.sleep(1)
。
futures文档中没有任何关于内存管理的内容,虽然简短的搜索表明它以前在futures的实际应用程序中出现过(Clear memory in python loop和http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures)-答案并没有转化为我的用例(它们都与超时等有关)。
那么,如何在不耗尽RAM的情况下使用Concurrent futures?(Python 3.5)
4条答案
按热度按时间tjvv9vkg1#
我会开枪(可能是一个错误的猜测。..)
您可能需要一点一点地提交您的工作,因为在每次提交时,您都要复制parser_variables,这可能最终会占用您的RAM。
下面是在有趣的部分使用“〈----”的工作代码
4smxwvx52#
尝试像这样将
del
添加到代码中:piah890a3#
查看
concurrent.futures.as_completed()
函数,我了解到它足以确保不再有任何对未来的引用。如果在得到结果后立即分发此引用,将最大限度地减少内存使用。我使用生成器表达式来存储我的
Future
示例,因为我关心的所有内容都已经在future的结果中返回了(基本上就是分派工作的状态)。其他实现使用dict
,例如在您的例子中,因为您不返回输入文件名作为线程工作程序结果的一部分。使用生成器表达式意味着一旦产生结果,就不再有任何对
Future
的引用。在内部,as_completed()
已经在向您生成完整的Future
后删除了自己的引用。编辑:从使用
set
和删除条目简化为简单地使用生成器表达式。ar7v8xwq4#
我也有同样的问题
在我的情况下,我需要启动数百万个线程。对于python2,我会使用dict自己编写一个线程池。但是在python3中,当我动态删除已完成的线程时,我遇到了以下错误:
所以我必须使用concurrent。futures,起初我这样编码:
但很快内存就耗尽了,因为只有在所有线程完成后才释放内存。我需要删除完成线程之前,许多线程开始。所以我在这里读了文档: www.example.com
找到那个执行者。shutdown(wait=True)可能是我需要的。这是我的最终解决方案:
如果使用with语句,则可以避免显式调用此方法,因为这将关闭Executor(就像Executor一样等待。调用shutdown(),wait设置为True)。
更新:
找到了一个更好的解决方案: