无法在fastapi中管理多个用户的WebSocket连接

mwkjh3gx  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(261)

我想在字典中存储和管理用户WebSocket连接,以便为每个用户创建自定义事件。

这里有一个非常简单的实现我想实现的目标:

from fastapi import FastAPI, WebSocket
app = FastAPI()
con = {}
@app.websocket("/ws/{id}")
async def websocket_endpoint(websocket: WebSocket, id: str):
    try:
        print("connect", id)
        await websocket.accept()
        con[id] = websocket
        print("check", con.keys())
        for uid in con:
            print(uid)
            await con[uid].send_text("hello")
    except Exception as e:
        print(e)

这是日志:

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [26605] using StatReload
INFO:     Started server process [26610]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
connect 1
INFO:     ('127.0.0.1', 48346) - "WebSocket /ws/1" [accepted]
check dict_keys(['1'])
1
INFO:     connection open
connect 2
INFO:     ('127.0.0.1', 48354) - "WebSocket /ws/2" [accepted]
check dict_keys(['1', '2'])
1
Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
INFO:     connection open

当我用客户端1连接到WebSocket时,一切都按预期工作,我得到消息hello

客户端1日志:

CONNECTING: WS://LOCALHOST:8000/WS/1

CONNECTION READY

hello

CONNECTION CLOSED!

但是当seconds客户端连接到websocket时,它在服务器中返回一个异常:

Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.

客户端2没有收到任何消息:

CONNECTING: WS://LOCALHOST:8000/WS/1

CONNECTION READY

CONNECTION CLOSED!

我这样运行应用程序:

uvicorn main:app --reload

我做错了什么?

wrrgggsh

wrrgggsh1#

您的FastAPI WebSocket route似乎没有事件循环来连续处理每个连接的客户端的传入或传出消息。
含义:一旦函数websocket_endpoint完成执行,FastAPI会自动关闭WebSocket连接,导致意外的ASGI消息错误。
connections字典 * 放在函数作用域之外 * 将允许跨不同的函数调用存储和访问多个WebSocket连接。
您需要一个while True:循环来确保WebSocket连接保持打开状态,从而允许连续的消息发送和接收。
添加一些错误处理从来都不是一个坏主意:这将有助于通过WebSocketDisconnect优雅地处理客户端断开。这样,当客户端断开连接时,可以从connections字典中删除该连接。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict

app = FastAPI()
connections: Dict[str, WebSocket] = {}

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    connections[client_id] = websocket

    try:
        while True:
            data = await websocket.receive_text()
            # Do something with received data (optional)
            
            for uid, ws in connections.items():
                await ws.send_text(f"Client {client_id} says: {data}")

    except WebSocketDisconnect:
        del connections[client_id]

例如,代码将来自任何客户端的传入消息发送到所有连接的客户端。您可以根据您的用例的需要自定义此设置。
新工作流程:

+---------------------+
   |  websocket_endpoint |
   +-------+-------------+
           |
           |
   +-------v-----------------+
   |await websocket.accept() |
   +-------+-----------------+
           |
           |
   +-------v----------------------------+
   | connections[client_id] = websocket |
   +-------+----------------------------+
           |
           |
           |<-------------------------------------------+
           |                                            |
   +-------v----------+                        +--------+------------+
   |   while True:    |                        | WebSocketDisconnect |
   +-------+----------+                        +--------+------------+
           |                                            |
           |                                            |
   +-------v-------------------------------+   +--------v-------------------+
   | data = await websocket.receive_text() |<--| del connections[client_id] |
   +-------+-------------------------------+   +----------------------------+
           |
           |
   +-------v-----------------------------+
   | for uid, ws in connections.items(): |
   +-------+-----------------------------+
           |
           |
   +-------v--------------+          +--------------------+
   | await ws.send_text() |---...--->|     while True:    |
   +-------+--------------+          +--------------------+

相关问题