此问题已在此处有答案:
FastAPI runs api-calls in serial instead of parallel fashion(2个答案)
How to avoid blocking the asyncio event loop with looping functions(1个答案)
7天前关闭
所以我有一个CPU限制的长时间运行的算法,我们称之为任务。假设它看起来像这样:
def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
result += do_things()
return result
@app.get('/')
def results(parameters: BodyModel):
return task(parameters)
如果我将其封装在def
*path操作函数 * 中,一切都可以正常工作,因为它在不同的线程中启动。我可以访问多个路径等。并发是通过将我的CPU绑定任务推到一个单独的线程来完成它的工作。但我现在想切换到WebSockets,以传达中间结果。要做到这一点,我必须将整个事情标记为异步,并将WebSocket传递到我的任务中。所以它看起来像这样:
async def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
intermediate_result = do_things()
await parameters.websocket.send_text(intermediate_result)
result += intermediate_result
return result
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
parameters = await websocket.receive_text()
parameters.websocket = websocket
result = await task(parameters)
await websocket.send_text(result)
它的工作原理就像一个魅力发送中间结果。但是现在我的算法阻塞了FastAPI,因为它本身并不是真正的异步。一旦我向'/ws'发布消息,FastAPI就会被阻止,并且在我的任务完成之前不会响应任何其他请求。
所以我需要一些建议
- a)要么从同步CPU绑定任务中发送WebSocket消息(我没有找到同步send_text替代方案),这样我就可以使用
def
,要么 - B)如何使我的CPU绑定真正异步,以便当我使用
async def
时,它不再阻止任何东西。
我试着使用here描述的ProcessPoolExecuter,但它不可能pickle一个协程,据我所知,我必须使我的任务成为一个协程(使用异步)才能在其中使用websocket.send_text()
。
此外,我还考虑将中间结果存储在某个地方,创建一个HTTP POST来启动我的任务,然后使用另一个WebSocket连接来读取和发送中间结果。但是我也可以类似地启动一个后台任务并实现一个常规的HTTP轮询机制。但我也不想,主要是因为我计划使用Google Cloud Run,当所有连接都关闭时,它会限制CPU。我认为最好的做法是教我的任务如何直接通过WebSocket进行通信。
我希望我的问题很清楚。这是我的第一个使用FastAPI和异步的大型项目,以前没有真正使用过AsyncIO。所以我可能漏掉了什么。谢谢你的建议。
1条答案
按热度按时间hk8txs481#
如果有人遇到这种情况,我将添加现在对我有效的解决方案。
我在跟踪这个:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
关键是要使其不阻塞。例如,而不是:
我移动了await并将代码改为:
这样我就把我的cpu_bound的东西放在一个单独的线程中,我没有等待,一切都很好。
创建一个自定义线程池也是可行的,但是我们需要删除上下文管理器以使其非阻塞。
将变成:
从理论上讲,这同样适用于ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存,我的终止条件不会像上面描述的那样工作。
是的,我知道cpu_bound的东西最好在不同的进程中完成,但是在我的情况下,将其移动到一个单独的线程并不起作用,我确实喜欢共享内存atm。