python 从FastAPI中的磁盘重新加载worker状态

z0qdvdin  于 2023-04-28  发布在  Python
关注(0)|答案(1)|浏览(160)

阅读现有问题,我仍然不确定是否可以在FastAPI中重新加载所有Worker进程的状态。
Bellow是一个简化的应用程序草图:

some_dict = load_data(path)
server = Server(some_dict)
app = FastAPI()  
app.include_router(server.router)
uvicorn.run(app, ...)

服务器:

class Server:
   def __init__(self, some_dict: Dict[str, ...]):
       self.router = APIRouter()
       self.router.add_api_route("/bar", self.bar, methods=["POST"])
       self.some_dict = some_dict 

def bar(self, text: str):
    pass

是否有一种方法可以定期从磁盘重新加载这个字典,以便所有的工作者和所有的线程都可以安全地看到更改?带或不带FastAPI内置。
FastAPI后台任务是在每个worker中运行,还是在单个(单独)进程中运行?
我想避免重新加载服务器,而是构建单独的dict示例并替换如下所示的引用:

# this should be the only place to update state of some_dict
class Server:
    def reload(self,..):
        data = load_from_disk()
        some_dict = build_from_data(data)
        self.some_dict = some_dict

解决这个问题的最简单的例子是什么?如果工作人员短暂地不同步,也没关系
value对象很复杂,虽然它可以放在Redis中,但我正在寻找是否可以首先从FastAPI中实现。

yvfmudvl

yvfmudvl1#

与定期重新加载dict不同,您可能希望让它检查是否需要在您访问它时从磁盘重新加载数据。
我没有测试过这个,但理论是存在的。..

import os
import pickle
import time
from collections.abc import Mapping

class PeriodicallyReloadedReadonlyDict(Mapping):
    def __init__(self, filename: str, max_age: float = 60.0):
        self.filename = filename
        self.max_age = max_age
        self.last_loaded = None
        self.last_mtime = None
        self.state = None

    def _ensure_loaded(self):
        if self.state is None or time.time() - self.last_loaded > self.max_age:
            if not os.path.exists(self.filename):
                self.state = {}
            else:
                mtime = os.path.getmtime(self.filename)
                if mtime > self.last_loaded:
                    with open(self.filename, "rb") as f:
                        self.state = pickle.load(f)
                self.last_mtime = mtime
            self.last_loaded = time.time()

    def save(self) -> None:
        with open(self.filename, "wb") as f:
            pickle.dump(self.state, f)

    def __getitem__(self, key):
        self._ensure_loaded()
        return self.state[key]

    def __len__(self) -> int:
        self._ensure_loaded()
        return len(self.state)

    def __iter__(self):
        self._ensure_loaded()
        return iter(self.state)

相关问题