python-3.x 多处理与多线程与异步

juzqafwq  于 2022-12-01  发布在  Python
关注(0)|答案(8)|浏览(148)

我发现在Python 3.4中有几个不同的多处理/线程库:multiprocessingthreadingasyncio得比较.
但我不知道该用哪一个,也不知道是不是“推荐的”。它们做的事情是一样的,还是不同的?如果是的话,哪一个是做什么的?我想在我的电脑上写一个使用多核的程序。但我不知道我应该学习哪个库。

xeufq47z

xeufq47z1#

TL;DR

做出正确的选择:

我们已经讨论了最流行的并发形式。但问题仍然存在--什么时候应该选择哪一种?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循以下伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")
  • CPU限制=〉多处理
  • I/O受限、快速I/O、有限数量的连接=〉多线程
  • I/O受限,I/O速度慢,连接多=〉Asyncio

Reference
[]:

  • 如果您有一个长调用方法(例如,包含休眠时间或惰性I/O的方法),最好的选择是asyncioTwistedTornado方法(协程方法),它与单个线程一起作为并发工作。
  • asyncio适用于 Python3.4 和更高版本。
  • TornadoTwisted已就绪,因为 Python2.7
  • uvloop是超快的asyncio事件循环(uvloop使asyncio的速度提高2- 4倍)。

[更新(2019年)]:

  • Japranto(GitHub)是一个基于uvloop的非常快速的流水线HTTP服务器。
bqf10yzr

bqf10yzr2#

它们用于(略微)不同的目的和/或要求。CPython(典型的主线Python实现)仍然具有global interpreter lock,因此多线程应用程序(现在实现并行处理的标准方法)是次优的。这就是为什么multiprocessing * 可能 * 优于threading的原因。但并不是每个问题都可以有效地分解成[几乎独立的]部分,因此可能需要大量的进程间通信,这就是为什么multiprocessing通常不优于threading的原因。
asyncio(这种技术不仅在Python中可用,其他语言和/或框架也有,例如Boost.ASIO)是一种方法,可以有效地处理来自多个同步源的大量I/O操作,而不需要并行代码执行。因此,它只是一种特定任务的解决方案(确实是一种很好的解决方案!),而不是一般的并行处理。

r9f1avp5

r9f1avp53#

multiprocessing中,您可以利用多个CPU来分配计算。由于每个CPU都是并行运行的,因此您可以有效地同时运行多个任务。您可能希望对CPU-bound任务使用多处理。例如,尝试计算一个巨大列表中所有元素的总和。如果您的机器有8个内核,您可以将列表“切割”为8个更小的列表,并在单独的内核上分别计算每个列表的总和,然后将这些数字相加。这样做可以获得约8倍的加速。
在(multi)threading中,你不需要多个CPU。想象一个程序向网络发送大量HTTP请求。如果你使用单线程程序,它会在每个请求时停止执行(阻塞),等待响应,然后在收到响应后继续执行。这里的问题是,在等待某个外部服务器完成任务时,你的CPU实际上没有工作;在此期间它实际上可以做一些有用的工作!修复方法是使用线程-你可以创建许多线程,每个线程负责从Web请求一些内容。线程的好处是,即使它们在一个CPU上运行,CPU会不时“冻结”一个线程的执行,并跳转到执行另一个线程(这称为上下文切换,它经常发生在非因此,如果您的任务是I/O bound-请使用线程。
asyncio本质上是线程化,不是CPU,而是作为程序员(或者实际上是你的应用程序)的你,决定何时何地发生上下文切换。在Python中,你使用await关键字来暂停你的协程(使用async关键字定义)的执行。

deikduxw

deikduxw4#

这是基本的想法:
是否为IO-绑定?-----------〉使用asyncio

CPU-重吗?---------〉使用multiprocessing

其他?---------------------〉使用**threading**
因此,除非您遇到IO/CPU问题,否则基本上应坚持使用线程。

xj3cbfub

xj3cbfub5#

许多答案都建议如何只选择一个选项,但为什么不能使用所有3个选项?在这个答案中,我将解释如何使用asyncio来管理组合所有3种形式的并发,以及稍后在需要时在它们之间轻松交换
简短的答案
许多第一次使用Python并发的开发人员最终会使用processing.Processthreading.Thread。然而,这些都是低级API,它们已经被concurrent.futures模块提供的高级API合并在一起。此外,派生进程和线程有开销,比如需要更多的内存,这个问题困扰着我下面展示的一个例子。在某种程度上,concurrent.futures为您管理这一点,这样您就不能像繁殖一千个进程那样,只繁殖几个进程,然后在每次一个进程完成时重用这些进程,从而使计算机崩溃。
这些高级API是通过concurrent.futures.Executor提供的,然后由concurrent.futures.ProcessPoolExecutorconcurrent.futures.ThreadPoolExecutor实现。在大多数情况下,您应该在multiprocessing.Processthreading.Thread上使用这些API,因为在将来使用concurrent.futures时,从一个API切换到另一个API会更容易,并且您不必了解每个API的详细差异。
由于它们共享一个统一的接口,您还会发现使用multiprocessingthreading的代码通常会使用concurrent.futuresasyncio也不例外,它通过以下代码提供了使用它的方法:

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

事实上,将threadingasyncio一起使用是如此普遍,以至于在Python 3.9中,他们添加了asyncio.to_thread(func, *args, **kwargs),以缩短默认的ThreadPoolExecutor

长长的答案#

这种方法有什么缺点吗?

是的。对于asyncio,最大的缺点是异步函数和同步函数不一样。如果你没有从一开始就考虑asyncio,这会给asyncio的新用户带来很多麻烦,并导致大量的返工。
另一个缺点是,你的代码的用户也将被迫使用asyncio。所有这些必要的返工通常会让第一次使用asyncio的用户感到很不舒服。

这样做是否有任何非性能优势?

是的。类似于使用concurrent.futures在其统一接口方面优于threading.Threadmultiprocessing.Process的原因,这种方法可以被视为从Executor到异步函数的进一步抽象。您可以从使用asyncio开始,如果以后您发现它的一部分,则需要使用threadingmultiprocessing。您可以使用asyncio.to_threadrun_in_executor。同样,您可能会在以后发现您试图使用线程运行的异步版本已经存在,因此您可以轻松地从使用threading后退一步,转而使用asyncio

这样做有何性能优势?

是的......也可以是否定的。这最终取决于任务。在某些情况下,它可能没有帮助(尽管它可能没有伤害),而在其他情况下,它可能有很大帮助。这个答案的其余部分提供了一些解释,为什么使用asyncio来运行Executor可能是有利的。

-组合多个执行器和其他异步代码

asyncio本质上提供了明显更多的并发控制,但代价是您需要更多地控制并发。如果您想同时运行使用ThreadPoolExecutor的一些代码和使用ProcessPoolExecutor的一些其他代码,使用同步代码管理这一点并不容易,但使用asyncio非常容易。

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

async def with_processing():
    with ProcessPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def with_threading():
    with ThreadPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def main():
    await asyncio.gather(with_processing(), with_threading())

asyncio.run(main())

**这是如何工作的?**基本上,asyncio要求执行程序运行它们的函数。然后,当执行程序运行时,asyncio将运行其他代码。例如,ProcessPoolExecutor启动一系列进程,然后在等待这些进程完成时,ThreadPoolExecutor启动一系列线程。asyncio将检查这些执行器,并在它们完成时收集它们的结果。此外,如果您有其他使用asyncio的代码,您可以在等待进程和线程完成的同时运行它们。

-缩小需要执行器的代码段

代码中有很多执行器并不常见,但我所看到的一个常见问题是,当人们使用线程/进程时,他们会将整个代码都塞进一个线程/进程中,期望它能工作。例如,我曾经看到过以下代码(大约):

from concurrent.futures import ThreadPoolExecutor
import requests

def get_data(url):
    return requests.get(url).json()["data"]

urls = [...]

with ThreadPoolExecutor() as executor:
    for data in executor.map(get_data, urls):
        print(data)

这段代码的有趣之处在于,有并发时比没有并发时要慢。为什么?因为生成的json很大,让许多线程占用大量内存是灾难性的。幸运的是,解决方案很简单:

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [...]

with ThreadPoolExecutor() as executor:
    for response in executor.map(requests.get, urls):
        print(response.json()["data"])

现在一次只将一个json卸载到内存中,一切都很好。
"教训是什么"
您不应该尝试将所有代码都放到线程/进程中,而应该关注代码中实际需要并发的部分。

但是如果get_data不是一个如此简单的函数呢?如果我们必须在函数中间的某个位置应用executor呢?这就是asyncio的用武之地:

import asyncio
import requests

async def get_data(url):
    # A lot of code.
    ...
    # The specific part that needs threading.
    response = await asyncio.to_thread(requests.get, url, some_other_params)
    # A lot of code.
    ...
    return data

urls = [...]

async def main():
    tasks = [get_data(url) for url in urls]
    for task in asyncio.as_completed(tasks):
        data = await task
        print(data)

asyncio.run(main())

尝试用concurrent.futures做同样的事情绝不是件好事。你可以使用回调、队列等,但是它比基本的asyncio代码要难管理得多。

nwlls2ji

nwlls2ji6#

已经有很多好的答案了。不能详细说明何时使用每一个。这是两个更有趣的组合。多处理+异步:https://pypi.org/project/aiomultiprocess/
它的设计用例是highio,但仍然使用尽可能多的可用内核。Facebook使用这个库编写了某种基于python的文件服务器。Asyncio允许IO绑定流量,但多处理允许多个事件循环和多个内核上的线程。
回购协议中的Ex代码:

import asyncio
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
    async with request("GET", url) as response:
        return await response.text("utf-8")

async def main():
    urls = ["https://jreese.sh", ...]
    async with Pool() as pool:
        async for result in pool.map(get, urls):
            ...  # process result
            
if __name__ == '__main__':
    # Python 3.7
    asyncio.run(main())
    
    # Python 3.6
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())

只是和补充在这里,不会工作在说jupyter笔记本电脑很好,因为笔记本电脑已经有一个异步循环运行。只是一个小注意,你不要拉你的头发了。

rqmkfv5c

rqmkfv5c7#

*多处理可以并行运行。
*多线程异步不能并行运行。

借助英特尔(R)酷睿(TM)i7- 8700 K CPU@3.70 GHz32.0 GB RAM,我使用2个进程2个线程2个异步任务计算了2100000之间有多少个素数,如下所示。* 这是CPU极限计算
| 多重行程|多执行绪|异步的|
| - -|- -|- -|
| 二十三秒八十七|四十五秒二十四|四十四秒七十七|
因为多处理可以并行运行,所以多处理多线程异步快一倍,如上所示。
我用了3套代码如下:

多重行程:

# "process_test.py"

from multiprocessing import Process
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()

    print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds

结果:

...
9592
9592
23.87 seconds

多线程:

# "thread_test.py"

from threading import Thread
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

thread_list = []

for _ in range(0, 2): # 2 threads
    thread = Thread(target=test)
    thread_list.append(thread)
    
for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds

结果:

...
9592
9592
45.24 seconds

阿森西奥:

# "asyncio_test.py"

import asyncio
import time
start_time = time.time()

async def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(test())

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds

结果:

...
9592
9592
44.77 seconds
qcbq4gxm

qcbq4gxm8#

多处理每个进程都有自己的Python解释器,并且可以在处理器的单独内核上运行。Python多处理是一个包,它支持使用类似于线程模块的API来生成进程。多处理包提供真正的并行性,通过使用子进程而不是线程来有效地绕过全局解释器锁。

当您有CPU密集型工作时,请使用多重行程。

多线程Python的多线程允许你在进程中产生多个线程。这些线程可以共享相同的内存和进程资源。在CPython中,由于全局解释器锁定,在任何给定的时间只有一个线程可以运行,因此你不能使用多个内核。由于GIL的限制,Python中的多线程不能提供真正的并行。
AsyncioAsyncio采用协作多任务概念。Asyncio任务在同一线程上运行,因此没有并行性,但它为开发人员提供了更好的控制,而不是像多线程中那样由操作系统控制。

在这个link上有一个关于asyncio相对于线程的优势的很好的讨论。
Lei Mao写了一篇关于Python并发here的博客
Multiprocessing VS Threading VS AsyncIO in Python Summary

相关问题