python-3.x 如何用异步并行计算?

3npbholx  于 2023-03-20  发布在  Python
关注(0)|答案(2)|浏览(186)

我有一个代码块需要很长时间才能执行,并且占用大量的CPU资源。我想多次运行该代码块,并希望充分利用CPU的能力。通过查看asyncio,我了解到它主要用于异步通信,但也是异步任务的通用工具。
在下面的例子中,time.sleep(y)是我想要运行的代码的占位符。在这个例子中,每个协同例程都是一个接一个地执行的,执行大约需要8秒。

import asyncio
import logging
import time

async def _do_compute_intense_stuff(x, y, logger):
    logger.info('Getting it started...')
    for i in range(x):
        time.sleep(y)
    logger.info('Almost done')
    return x * y

logging.basicConfig(format='[%(name)s, %(levelname)s]: %(message)s', level='INFO')
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
co_routines = [
    asyncio.ensure_future(_do_compute_intense_stuff(2, 1, logger.getChild(str(i)))) for i in range(4)]
logger.info('Made the co-routines')
responses = loop.run_until_complete(asyncio.gather(*co_routines))
logger.info('Loop is done')
print(responses)

当我用asyncio.sleep(y)替换time.sleep(y)时,它几乎立即返回,而用await asyncio.sleep(y)则需要大约2秒。
是否有一种方法可以使用这种方法并行化我的代码,或者我应该使用multiprocessingthreading?我是否需要将time.sleep(y)放入线程中?

avwztpqn

avwztpqn1#

执行器使用多线程来完成这一点(或者多处理,如果你喜欢的话)。Asyncio用于优化你经常等待输入、输出操作运行的代码。有时这可以是写入文件或加载网站。
然而,对于CPU密集型操作(不只是依赖于等待IO),建议使用类似于线程的东西,在我看来,concurrent.futures为此提供了一个非常好的 Package 器,它类似于Asyncio的 Package 器。
Asyncio.sleep之所以能让你的代码运行得更快,是因为它启动了函数,然后开始检查协程,看看它们是否准备好了。这对于CPU密集型操作来说扩展性不好,因为没有IO要等待。
要将下面的示例从多处理更改为多线程,只需将ProcessPoolExecutor更改为ThreadPoolExecutor
下面是一个多处理示例:

import concurrent.futures
import time

def a(z):
    time.sleep(1)
    return z*12

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(a, i) for i in range(5)}
        for future in concurrent.futures.as_completed(futures):
            data = future.result()
            print(data)

这是documentation for executors中提供的示例的简化版本。

pbgvytdp

pbgvytdp2#

简单示例

本示例摘自https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/
这对我帮助很大,还有一个“坏例子”--这对我帮助更大。

import aiohttp
import asyncio
import async_timeout
import os

async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()

async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

相关问题