使FastAPI WebSockets的CPU绑定任务异步[duplicate]

anauzrmj  于 2023-06-23  发布在  其他
关注(0)|答案(1)|浏览(132)

此问题已在此处有答案

FastAPI runs api-calls in serial instead of parallel fashion(2个答案)
How to avoid blocking the asyncio event loop with looping functions(1个答案)
7天前关闭
所以我有一个CPU限制的长时间运行的算法,我们称之为任务。假设它看起来像这样:

def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        result += do_things()
  return result

@app.get('/')
def results(parameters: BodyModel):
    return task(parameters)

如果我将其封装在def *path操作函数 * 中,一切都可以正常工作,因为它在不同的线程中启动。我可以访问多个路径等。并发是通过将我的CPU绑定任务推到一个单独的线程来完成它的工作。但我现在想切换到WebSockets,以传达中间结果。要做到这一点,我必须将整个事情标记为异步,并将WebSocket传递到我的任务中。所以它看起来像这样:

async def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        intermediate_result = do_things()
        await parameters.websocket.send_text(intermediate_result)
        result += intermediate_result
  return result

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        parameters = await websocket.receive_text()
        parameters.websocket = websocket
        result = await task(parameters)
        await websocket.send_text(result)

它的工作原理就像一个魅力发送中间结果。但是现在我的算法阻塞了FastAPI,因为它本身并不是真正的异步。一旦我向'/ws'发布消息,FastAPI就会被阻止,并且在我的任务完成之前不会响应任何其他请求。
所以我需要一些建议

  • a)要么从同步CPU绑定任务中发送WebSocket消息(我没有找到同步send_text替代方案),这样我就可以使用def,要么
  • B)如何使我的CPU绑定真正异步,以便当我使用async def时,它不再阻止任何东西。

我试着使用here描述的ProcessPoolExecuter,但它不可能pickle一个协程,据我所知,我必须使我的任务成为一个协程(使用异步)才能在其中使用websocket.send_text()
此外,我还考虑将中间结果存储在某个地方,创建一个HTTP POST来启动我的任务,然后使用另一个WebSocket连接来读取和发送中间结果。但是我也可以类似地启动一个后台任务并实现一个常规的HTTP轮询机制。但我也不想,主要是因为我计划使用Google Cloud Run,当所有连接都关闭时,它会限制CPU。我认为最好的做法是教我的任务如何直接通过WebSocket进行通信。
我希望我的问题很清楚。这是我的第一个使用FastAPI和异步的大型项目,以前没有真正使用过AsyncIO。所以我可能漏掉了什么。谢谢你的建议。

hk8txs48

hk8txs481#

如果有人遇到这种情况,我将添加现在对我有效的解决方案。
我在跟踪这个:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
关键是要使其不阻塞。例如,而不是:

# 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)

我移动了await并将代码改为:

# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
    asyncio.sleep(1)
    websocket.send_text('status updates...'
    if internal_logger.blocking_stuff_finished:
        break
result = await thread
websocket.send_text('result:', result)
websocket.close()

这样我就把我的cpu_bound的东西放在一个单独的线程中,我没有等待,一切都很好。
创建一个自定义线程池也是可行的,但是我们需要删除上下文管理器以使其非阻塞。

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
    result = await loop.run_in_executor(pool, blocking_io)

将变成:

pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)

从理论上讲,这同样适用于ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存,我的终止条件不会像上面描述的那样工作。
是的,我知道cpu_bound的东西最好在不同的进程中完成,但是在我的情况下,将其移动到一个单独的线程并不起作用,我确实喜欢共享内存atm。

相关问题