我试图创建一个使用websockets的fastapi应用程序,可以向所有连接的客户端广播消息。我发现这是不可能的websockets,但发现briliant库-socket.io。但是我不知道我如何使用它,并将其与我现有的fastapi应用程序集成。
xcitsw881#
# server.py from typing import Any import socketio import uvicorn from fastapi import FastAPI sio: Any = socketio.AsyncServer(async_mode="asgi") socket_app = socketio.ASGIApp(sio) app = FastAPI() @app.get("/test") async def test(): print("test") return "WORKS" app.mount("/", socket_app) # Here we mount socket app to main fastapi app @sio.on("connect") async def connect(sid, env): print("on connect") @sio.on("direct") async def direct(sid, msg): print(f"direct {msg}") # we can send message to specific sid await sio.emit("event_name", msg, room=sid) @sio.on("broadcast") async def broadcast(sid, msg): print(f"broadcast {msg}") await sio.emit("event_name", msg) # or send to everyone @sio.on("disconnect") async def disconnect(sid): print("on disconnect") if __name__ == "__main__": kwargs = {"host": "0.0.0.0", "port": 5000} kwargs.update({"debug": True, "reload": True}) uvicorn.run("server:app", **kwargs)
个字符最后安装适当的依赖项:
# server.py pip install python-socketio uvicorn fastapi # client.py pip install requests websocket-client
型
1条答案
按热度按时间xcitsw881#
个字符
最后安装适当的依赖项:
型