fastapi,将长任务添加到缓冲区并逐个处理,同时保持服务器响应能力

c0vxltue  于 2021-09-29  发布在  Java
关注(0)|答案(0)|浏览(296)

我正在尝试建立一个fastapi服务器,它将接收一些生物数据作为输入,并对它们运行一些处理。由于处理占用了服务器的所有资源,因此应该按顺序处理查询。但是,服务器应该保持响应,并在缓冲区中添加更多请求。我一直在尝试使用backgroundtasks模块进行此操作,但是在发送第二个查询后,在任务运行时响应会延迟。感谢您的帮助,并提前表示感谢。

import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        os.makedirs(self.experiment_dir, exist_ok=False)

    def run(self):
        self.status = "running"
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题