与Python并发但异步地运行阻塞函数(例如请求

bttbmeg0  于 2023-01-22  发布在  Python
关注(0)|答案(2)|浏览(182)

有一个函数可以阻止事件循环(例如,该函数发出API请求)。我需要发出连续的请求流,这些请求将并行运行,但不同步。因此,每个下一个请求将在前一个请求完成之前开始。
因此,我在loop.run_in_executer()解决方案中找到了this solved question,并在开始时使用它:

import asyncio
import requests

#blocking_request_func() defined somewhere

async def main():
    loop = asyncio.get_event_loop()
    future1 = loop.run_in_executor(None, blocking_request_func, 'param')
    future2 = loop.run_in_executor(None, blocking_request_func, 'param')
    response1 = await future1
    response2 = await future2
    print(response1)
    print(response2)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

这工作得很好,请求并行运行,但是我的任务有一个问题--在这个例子中,我们在开始时创建一组任务/future,然后同步运行这个组。2但是我需要这样的东西:

1. Sending request_1 and not awaiting when it's done.
(AFTER step 1 but NOT in the same time when step 1 starts):
2. Sending request_2 and not awaiting when it's done.
(AFTER step 2 but NOT in the same time when step 2 starts):
3. Sending request_3 and not awaiting when it's done.
(Request 1(or any other) gives the response)
(AFTER step 3 but NOT in the same time when step 3 starts):
4. Sending request_4 and not awaiting when it's done.
(Request 2(or any other) gives the response)

and so on...

我尝试使用asyncio.TaskGroup()

async def request_func():
    global result #the list of results of requests defined somewhere in global area
    loop = asyncio.get_event_loop()
    result.append(await loop.run_in_executor(None, blocking_request_func, 'param')
    await asyncio.sleep(0) #adding or removing this line gives the same result

async def main():
    async with asyncio.TaskGroup() as tg:
       for i in range(0, 10):
           tg.create_task(request_func())

所有这些都得出了相同的结果:首先我们定义了一组任务/future,然后才同步和并发地运行这组任务。2但是有没有一种方法可以并发地运行所有这些请求,但是“在流中”呢?
如果我的解释不够清楚,我试着形象化。
What I have for now
What I need

y0u0uwnf

y0u0uwnf1#

我想这可能是你想要的。你不必等待每个请求--run_in_executor函数返回一个Future。你可以附加一个回调函数来代替等待。

import asyncio
import random
import time

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

def cb(fut):
    print("Result", fut.result())
    
async def main():
    loop = asyncio.get_event_loop()
    futs = []
    for n in range(20):
        fut = loop.run_in_executor(None, blockme, n)
        fut.add_done_callback(cb)
        futs.append(fut)
    await asyncio.gather(*futs)
    # await asyncio.sleep(10)

asyncio.run(main())

所有的请求都在开始时启动,但是它们并不都并行执行,因为线程的数量受到ThreadPool的限制。如果需要,您可以调整线程的数量。
这里我用time. sleep模拟了一个阻塞调用。我需要一种方法来防止main()在所有回调发生之前结束,所以我使用gather来实现这个目的。您也可以等待一段时间,但是gather更干净。
如果我不明白您想要什么,请向我道歉,但是我认为您希望避免在每次调用时都使用await,我尝试展示了一种方法。

4dc9hkyq

4dc9hkyq2#

直接引用Python文档,asyncio库文档的代码片段解释了如何使用asyncio并发运行阻塞代码,使用to_thread方法创建任务
您可以在此处找到更多信息-https://docs.python.org/3/library/asyncio-task.html#running-in-threads

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")

asyncio.run(main())

相关问题