Python 3.2引入了Concurrent Futures,它似乎是旧的线程和multiprocessing模块的某种高级组合。
与旧的多处理模块相比,将此模块用于受CPU限制的任务有哪些优点和缺点?
This article表明它们更容易使用-是这样吗?
Python 3.2引入了Concurrent Futures,它似乎是旧的线程和multiprocessing模块的某种高级组合。
与旧的多处理模块相比,将此模块用于受CPU限制的任务有哪些优点和缺点?
This article表明它们更容易使用-是这样吗?
6条答案
按热度按时间xn1cxnb41#
我不会说
concurrent.futures
更“高级”--它是一个 * 更简单 * 的接口,无论您使用多线程还是多进程作为底层并行化技巧,它的工作方式都非常相同。因此,就像几乎所有“更简单的界面”的示例一样,也涉及到许多相同的权衡:它的学习曲线较浅,这在很大程度上是因为可供学习的东西少得多;但是,因为它提供的选项较少,所以它最终可能会以更丰富的接口所不具备的方式让您感到沮丧。
到目前为止,对于CPU-bound任务来说,这是一个太少的定义,没有多大意义。对于CPython下的CPU-bound任务,您需要多个进程而不是多个线程来获得加速。(如果有的话)取决于硬件、操作系统的细节,尤其是特定任务所需的进程间通信量。所有进程间并行化技巧都依赖于相同的操作系统原语-您用来获取这些原语的高级API并不是影响底线速度的主要因素。
编辑:范例
下面是您参考的文章中显示的最终代码,但我添加了一个import语句,使其正常工作:
下面使用
multiprocessing
执行完全相同的操作:请注意,使用
multiprocessing.Pool
对象作为上下文管理器的能力是在Python 3.3中添加的。至于哪一个更容易使用,它们本质上是相同的。
一个不同之处是
Pool
支持如此多不同的做事方式,以至于直到您在学习曲线上爬了相当长的一段路之后,您才可能意识到它 * 可以 * 有多容易。同样,所有这些不同的方法既是优点也是缺点。它们是优点,因为在某些情况下可能需要灵活性。它们是缺点,因为“最好只有一种明显的方法来做它”。一个完全坚持(如果可能的话)
concurrent.futures
的项目从长远来看可能更容易维护,因为在如何使用它的最小API方面缺乏免费的新奇。wnrlj8wa2#
在大多数情况下,当你需要并行处理时,你会发现
concurrent.futures
模块中的ProcessPoolExecutor
类或multiprocessing
模块中的Pool
类提供了相同的功能,这取决于你的个人喜好。但是,每一个类都提供了一些功能,使某些处理更方便。我想我只需要指出其中的两个:当提交一批任务时,您有时希望在任务结果(即返回值)可用时立即获得它们。这两种工具都提供了通知,即已提交任务的结果可通过回调机制获得:
使用
multiprocessing.Pool
:使用
concurrent.futures
的回调函数也可以完成同样的操作,尽管有些笨拙:这里,每个任务都是单独提交的,并返回一个
Future
示例。然后,必须将回调添加到Future
。最后,当调用回调时,传递的参数是已完成任务的Future
示例,并且必须调用方法result
以获得实际返回值。但是,对于concurrent.futures
模块,实际上根本不需要使用回调函数。可以使用as_completed
方法:通过使用字典保存
Future
示例,可以很容易地将返回值与worker_process
的原始传递参数联系起来:multiprocessing.Pool
有imap
和imap_unordered
方法,后者允许任务结果以 * 任意顺序 * 返回,但不一定以完成顺序返回。这些方法被认为是map
的 * 较懒 * 版本。对于方法map
,如果传递的 iterable 参数没有__len__
属性,它将首先被转换为list
,并且如果None
作为 chunksize 参数提供,则它的长度将用于计算有效的chunksize
值。因此,您不能通过使用生成器或生成器表达式作为 iterable 来实现任何存储优化。但是使用方法imap
和imap_unordered
,iterable 可以是生成器或生成器表达式;它将根据需要进行迭代以产生新的任务提交。但是这需要默认的 chunksize 参数为1,因为 iterable 的长度通常是未知的。但是如果您对 iterable 的长度有一个很好的近似值,这并不妨碍您使用multiprocessing.Pool
类使用的相同算法提供一个合理的值。(或 * 确切 * 大小,如下例所示):另一方面,通过
imap_unordered
和imap
指定chunksize
的能力,对于chunksize
,结果 * 将 * 处于可预测的顺序,应该会使这些方法比反复调用apply_async
方法更有效,因为反复调用apply_async
方法实质上等效于使用块大小为1。但是如果确实需要按完成顺序处理结果,那么为了确保您应该使用带有回调函数的方法apply_async
。然而,根据实验,如果您在imap_unordered
中使用值为1的 chunksize,结果将按完成顺序返回。concurrent.futures
套件中ProcessPoolExecutor
类别的map
方法与multiprocessing
套件中的Pool.imap
方法在某个方面类似。此方法不会将其传递的 iterable 参数(产生器表示式)转换为清单,以便计算有效的 chunksize 值,这就是 chunksize 参数预设为1的原因,以及如果要传递较大的 iterables,则应该考虑指定一个适当的 chunksize 值。但是,与Pool.imap
不同,根据我的经验,在传递给map
的所有 iterables 都被完全迭代之前,不能通过迭代map
返回的 iterable 来检索第一个结果。multiprocessing.Pool
类别具有apply
方法,可将工作提交至集区并封锁,直到结果就绪为止。传回值只是从背景工作函式传递至apply
函式的传回值。例如:concurrent.futures.ProcessPoolExecutor
类没有这样的等价类。您必须发出一个submit
,然后对返回的Future
示例调用result
。这样做并不困难,但是Pool.apply
方法对于适合阻塞任务提交的使用情况更方便。这种情况是当您的处理需要线程化时,因为线程中完成的大多数工作都是大量的I/O,除了一个可能对CPU有很大限制的函数。创建线程的主程序首先创建一个multiprocessing.Pool
示例并将其作为参数传递给所有线程。它现在使用X1 M50 N1 X方法运行该函数,从而在另一进程中运行该代码,并释放当前进程以允许其它线程运行。concurrent.futures
模块有两个类,ProcessPoolExecutor
和ThreadPoolExecutor
,它们具有相同的接口,这是一个很好的特性,但是multiprocessing
模块也有一个未公开的ThreadPool
类,它具有与Pool
相同的接口:您可以使用
ProcessPoolExecutor.submit
(传回Future
执行严修)或Pool.apply_async
(传回AsyncResult
执行严修)提交工作,并指定撷取结果的逾时值:印刷品:
主进程在调用
future.result(3)
时会在3秒后得到一个TimeoutError
异常,因为提交的任务没有在该时间段内完成。但是任务继续运行,占用了进程,并且with ProcessPoolExecutor(1) as pool:
块永远不会退出,因此程序不会终止。印刷品:
然而,这一次,即使超时的任务仍在继续运行并占用进程,不阻止X1 M64 N1 X块退出,因此程序正常终止。这是因为当块退出时,
Pool
示例的上下文管理器将执行对terminate
的调用,这将导致这与ProcessPoolExecutor
示例的上下文处理程序形成对比,后者执行对shutdown(wait=True)
的调用,以在其管理的块退出时等待池中所有进程的终止。如果您使用上下文处理程序来处理池终止,并且可能存在超时,则multiprocessing.Pool
似乎具有优势。更新:在Python 3.9中,一个新的参数 cancel_futures 已经被添加到shutdown
方法中,因此,如果你显式调用shutdown(wait=False, cancel_futures=True)
,而不是依赖于默认行为(默认行为是在使用上下文处理程序时隐式调用shutdown
),你可以终止所有正在运行的任务和任何等待运行的任务。但是,由于
multiprocessing.Pool
的上下文处理程序只调用terminate
,而不是close
后跟join
,因此必须确保在退出with
块之前已完成所有已提交的作业,例如,通过提交带有阻塞的作业,同步调用(如map
),或在apply_async
调用返回的AsyncResult
对象上调用get
,或迭代imap
调用的结果,或在池示例上调用close
,然后调用join
。尽管在使用
ProcessPoolExecutor
时,在超时任务完成之前无法退出,您可以 * 取消 * 尚未运行的已提交任务的启动。在下面的演示中,我们有一个大小为1的池,因此作业只能连续运行。我们一个接一个地提交了3个作业,其中前两个作业需要3秒来运行,因为调用了time.sleep(3)
。我们立即尝试取消前两个作业。第一次取消尝试失败,因为第一个作业已在运行。但由于池只有一个进程,第二个作业必须等待3秒钟,以便第一个作业完成后才能开始运行,因此取消成功。最后,作业3将在作业1完成后立即开始和结束。这将是大约3秒后,我们开始提交工作:印刷品:
7uzetpgm3#
除了其他答案的详细差异列表之外,我个人还遇到了一个未固定的(截至2022年11月20日)indefinite hang that can happen with multiprocess.Pool当其中一名工人以某种方式崩溃时。(在我的例子中,是cython扩展的异常,尽管其他人说当工作线程获得SIGTERM时会发生这种情况,等等)根据ProcessPoolExecutor的文档,从Python3.3开始,它对此是健壮。
nqwrtyyt4#
根据我的经验,与concurrent. futures相比,我在多处理模块方面遇到了很多问题。(但这是在Windows操作系统上)
我能看到的两个主要区别是:
1.多处理模块中频繁挂起
1.并发futures有一种相对简单的执行方式,即获取结果,跟踪子进程etc.is非常简单。
示例:(正在获取结果)
因此,如果从
some_function()
返回任何值,可以直接使用f1.result()
捕获/存储它。如果您在Linux系统上运行,则可能不会发生挂起,但“多处理”模块中的执行复杂性仍然较高。
话虽如此,还需要注意的是,我的任务是高度CPU密集型任务。
就个人而言,我推荐并发期货。
bqf10yzr5#
我喜欢
concurrent.futures
,主要是因为迭代器的多个函数参数:multiprocessing
在获取一个函数的多个参数时有些笨拙(没有istarmap()
-starmap()
的等价物):我发现
imap()
/imap_unordered()
对于像tqdm
这样的进度条或大型计算的时间估计非常有用。在concurrents.futures
中,这是非常方便的:我也喜欢作为dict.:)的方便的结果Map
使用tqdm,您可以轻松地:
1dkrff036#
concurrent.futures
给予了更多控制,例如:输出示例: