Python 3中的并发futures与多处理

cedebl8k  于 2022-11-26  发布在  Python
关注(0)|答案(6)|浏览(175)

Python 3.2引入了Concurrent Futures,它似乎是旧的线程和multiprocessing模块的某种高级组合。
与旧的多处理模块相比,将此模块用于受CPU限制的任务有哪些优点和缺点?
This article表明它们更容易使用-是这样吗?

xn1cxnb4

xn1cxnb41#

我不会说concurrent.futures更“高级”--它是一个 * 更简单 * 的接口,无论您使用多线程还是多进程作为底层并行化技巧,它的工作方式都非常相同。
因此,就像几乎所有“更简单的界面”的示例一样,也涉及到许多相同的权衡:它的学习曲线较浅,这在很大程度上是因为可供学习的东西少得多;但是,因为它提供的选项较少,所以它最终可能会以更丰富的接口所不具备的方式让您感到沮丧。
到目前为止,对于CPU-bound任务来说,这是一个太少的定义,没有多大意义。对于CPython下的CPU-bound任务,您需要多个进程而不是多个线程来获得加速。(如果有的话)取决于硬件、操作系统的细节,尤其是特定任务所需的进程间通信量。所有进程间并行化技巧都依赖于相同的操作系统原语-您用来获取这些原语的高级API并不是影响底线速度的主要因素。

编辑:范例

下面是您参考的文章中显示的最终代码,但我添加了一个import语句,使其正常工作:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

下面使用multiprocessing执行完全相同的操作:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

请注意,使用multiprocessing.Pool对象作为上下文管理器的能力是在Python 3.3中添加的。
至于哪一个更容易使用,它们本质上是相同的。
一个不同之处是Pool支持如此多不同的做事方式,以至于直到您在学习曲线上爬了相当长的一段路之后,您才可能意识到它 * 可以 * 有多容易。
同样,所有这些不同的方法既是优点也是缺点。它们是优点,因为在某些情况下可能需要灵活性。它们是缺点,因为“最好只有一种明显的方法来做它”。一个完全坚持(如果可能的话)concurrent.futures的项目从长远来看可能更容易维护,因为在如何使用它的最小API方面缺乏免费的新奇。

wnrlj8wa

wnrlj8wa2#

在大多数情况下,当你需要并行处理时,你会发现concurrent.futures模块中的ProcessPoolExecutor类或multiprocessing模块中的Pool类提供了相同的功能,这取决于你的个人喜好。但是,每一个类都提供了一些功能,使某些处理更方便。我想我只需要指出其中的两个:
当提交一批任务时,您有时希望在任务结果(即返回值)可用时立即获得它们。这两种工具都提供了通知,即已提交任务的结果可通过回调机制获得:
使用multiprocessing.Pool

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

使用concurrent.futures的回调函数也可以完成同样的操作,尽管有些笨拙:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()

这里,每个任务都是单独提交的,并返回一个Future示例。然后,必须将回调添加到Future。最后,当调用回调时,传递的参数是已完成任务的Future示例,并且必须调用方法result以获得实际返回值。但是,对于concurrent.futures模块,实际上根本不需要使用回调函数。可以使用as_completed方法:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

通过使用字典保存Future示例,可以很容易地将返回值与worker_process的原始传递参数联系起来:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()

multiprocessing.Poolimapimap_unordered方法,后者允许任务结果以 * 任意顺序 * 返回,但不一定以完成顺序返回。这些方法被认为是map的 * 较懒 * 版本。对于方法map,如果传递的 iterable 参数没有__len__属性,它将首先被转换为list,并且如果None作为 chunksize 参数提供,则它的长度将用于计算有效的chunksize值。因此,您不能通过使用生成器或生成器表达式作为 iterable 来实现任何存储优化。但是使用方法imapimap_unorderediterable 可以是生成器或生成器表达式;它将根据需要进行迭代以产生新的任务提交。但是这需要默认的 chunksize 参数为1,因为 iterable 的长度通常是未知的。但是如果您对 iterable 的长度有一个很好的近似值,这并不妨碍您使用multiprocessing.Pool类使用的相同算法提供一个合理的值。(或 * 确切 * 大小,如下例所示):

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()

另一方面,通过imap_unorderedimap指定chunksize的能力,对于chunksize,结果 * 将 * 处于可预测的顺序,应该会使这些方法比反复调用apply_async方法更有效,因为反复调用apply_async方法实质上等效于使用块大小为1。但是如果确实需要按完成顺序处理结果,那么为了确保您应该使用带有回调函数的方法apply_async。然而,根据实验,如果您在imap_unordered中使用值为1的 chunksize,结果将按完成顺序返回。
concurrent.futures套件中ProcessPoolExecutor类别的map方法与multiprocessing套件中的Pool.imap方法在某个方面类似。此方法不会将其传递的 iterable 参数(产生器表示式)转换为清单,以便计算有效的 chunksize 值,这就是 chunksize 参数预设为1的原因,以及如果要传递较大的 iterables,则应该考虑指定一个适当的 chunksize 值。但是,与Pool.imap不同,根据我的经验,在传递给map的所有 iterables 都被完全迭代之前,不能通过迭代map返回的 iterable 来检索第一个结果。
multiprocessing.Pool类别具有apply方法,可将工作提交至集区并封锁,直到结果就绪为止。传回值只是从背景工作函式传递至apply函式的传回值。例如:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()

concurrent.futures.ProcessPoolExecutor类没有这样的等价类。您必须发出一个submit,然后对返回的Future示例调用result。这样做并不困难,但是Pool.apply方法对于适合阻塞任务提交的使用情况更方便。这种情况是当您的处理需要线程化时,因为线程中完成的大多数工作都是大量的I/O,除了一个可能对CPU有很大限制的函数。创建线程的主程序首先创建一个multiprocessing.Pool示例并将其作为参数传递给所有线程。它现在使用X1 M50 N1 X方法运行该函数,从而在另一进程中运行该代码,并释放当前进程以允许其它线程运行。
concurrent.futures模块有两个类,ProcessPoolExecutorThreadPoolExecutor,它们具有相同的接口,这是一个很好的特性,但是multiprocessing模块也有一个未公开的ThreadPool类,它具有与Pool相同的接口:

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

您可以使用ProcessPoolExecutor.submit(传回Future执行严修)或Pool.apply_async(传回AsyncResult执行严修)提交工作,并指定撷取结果的逾时值:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep

def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

印刷品:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

主进程在调用future.result(3)时会在3秒后得到一个TimeoutError异常,因为提交的任务没有在该时间段内完成。但是任务继续运行,占用了进程,并且with ProcessPoolExecutor(1) as pool:块永远不会退出,因此程序不会终止。

from multiprocessing import Pool, TimeoutError
from time import sleep

def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

印刷品:

hanging
hanging
hanging
timeout
return from main()

然而,这一次,即使超时的任务仍在继续运行并占用进程,不阻止X1 M64 N1 X块退出,因此程序正常终止。这是因为当块退出时,Pool示例的上下文管理器将执行对terminate的调用,这将导致这与ProcessPoolExecutor示例的上下文处理程序形成对比,后者执行对shutdown(wait=True)的调用,以在其管理的块退出时等待池中所有进程的终止。如果您使用上下文处理程序来处理池终止,并且可能存在超时,则multiprocessing.Pool似乎具有优势。更新:在Python 3.9中,一个新的参数 cancel_futures 已经被添加到shutdown方法中,因此,如果你显式调用shutdown(wait=False, cancel_futures=True),而不是依赖于默认行为(默认行为是在使用上下文处理程序时隐式调用shutdown),你可以终止所有正在运行的任务和任何等待运行的任务。
但是,由于multiprocessing.Pool的上下文处理程序只调用terminate,而不是close后跟join,因此必须确保在退出with块之前已完成所有已提交的作业,例如,通过提交带有阻塞的作业,同步调用(如map),或在apply_async调用返回的AsyncResult对象上调用get,或迭代imap调用的结果,或在池示例上调用close,然后调用join
尽管在使用ProcessPoolExecutor时,在超时任务完成之前无法退出,您可以 * 取消 * 尚未运行的已提交任务的启动。在下面的演示中,我们有一个大小为1的池,因此作业只能连续运行。我们一个接一个地提交了3个作业,其中前两个作业需要3秒来运行,因为调用了time.sleep(3)。我们立即尝试取消前两个作业。第一次取消尝试失败,因为第一个作业已在运行。但由于池只有一个进程,第二个作业必须等待3秒钟,以便第一个作业完成后才能开始运行,因此取消成功。最后,作业3将在作业1完成后立即开始和结束。这将是大约3秒后,我们开始提交工作:

from concurrent.futures import ProcessPoolExecutor
import time

def worker1(i):
    time.sleep(3)
    print('Done', i)

def worker2():
    print('Hello')

def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)

if __name__ == '__main__':
    main()

印刷品:

False
True
Done 1
Hello
3.1249606609344482
7uzetpgm

7uzetpgm3#

除了其他答案的详细差异列表之外,我个人还遇到了一个未固定的(截至2022年11月20日)indefinite hang that can happen with multiprocess.Pool当其中一名工人以某种方式崩溃时。(在我的例子中,是cython扩展的异常,尽管其他人说当工作线程获得SIGTERM时会发生这种情况,等等)根据ProcessPoolExecutor的文档,从Python3.3开始,它对此是健壮。

nqwrtyyt

nqwrtyyt4#

根据我的经验,与concurrent. futures相比,我在多处理模块方面遇到了很多问题。(但这是在Windows操作系统上)
我能看到的两个主要区别是:
1.多处理模块中频繁挂起
1.并发futures有一种相对简单的执行方式,即获取结果,跟踪子进程etc.is非常简单。
示例:(正在获取结果)

with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(some_function, parameter_to_be_passed) 
    print(f1.result())

因此,如果从some_function()返回任何值,可以直接使用f1.result()捕获/存储它。
如果您在Linux系统上运行,则可能不会发生挂起,但“多处理”模块中的执行复杂性仍然较高。
话虽如此,还需要注意的是,我的任务是高度CPU密集型任务。
就个人而言,我推荐并发期货。

bqf10yzr

bqf10yzr5#

我喜欢concurrent.futures,主要是因为迭代器的多个函数参数:multiprocessing在获取一个函数的多个参数时有些笨拙(没有istarmap()-starmap()的等价物):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)

我发现imap()/imap_unordered()对于像tqdm这样的进度条或大型计算的时间估计非常有用。在concurrents.futures中,这是非常方便的:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)

我也喜欢作为dict.:)的方便的结果Map
使用tqdm,您可以轻松地:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...
1dkrff03

1dkrff036#

concurrent.futures给予了更多控制,例如:

# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37

import concurrent.futures
import multiprocessing.pool
import random
import threading
import time

def hello(name):
    time.sleep(random.random())
    return f"Hello {name} {threading.current_thread()} "

print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
    print(args, "=>", result)

print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
    print(futures[future], "=>", future.result()

输出示例:

ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>

ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>

相关问题