FastAPI与集成的WebSocket客户端冻结,我应该如何合并这些?

cvxl0en2  于 2022-11-11  发布在  其他
关注(0)|答案(1)|浏览(134)

我目前在一个小型后端使用FastAPI。REST Api部分工作正常。下一步我需要查询一个WebSocket API并在FastAPI中处理它。我想过滤并转发传入的数据。当我启动应用程序时,websocket客户端开始运行,但实际的FastAPI在那之后就不工作了。
我怎样才能建立一个FastAPI,它可以额外的工作作为websocket客户端和服务器?
下面是我的代码:

from fastapi import FastAPI
from routers import foo
from services import bar as bar

app = FastAPI()

app.include_router(foo.router)

@app.on_event('startup')
async def startup_event():
    await bar.wh_client()

class wh_client:
def __init__(self, ):
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://xx.xx.xx.xx:xxxxx/",
                                     on_message=self.on_message,
                                     on_error=self.on_error,
                                     on_close=self.on_close)
    self.ws = ws
    self.ws.on_open = self.on_open
    self.ws.run_forever()

def on_message(self, message, stuff):
    print(message)
    return message

def on_error(self, error):
    return error

def on_close(self):
    print("### closed ###")

def run(self, *args):
    global driver
    driver = True
    while driver:
        try:
            time.sleep(1)
            self.ws.send('message')
        except KeyboardInterrupt:
            driver = False
    time.sleep(1)
    self.ws.close()
    print("thread terminating...")

def on_open(self, foo):
    thread.start_new_thread(self.run, ())
jhdbpxl9

jhdbpxl91#

我也遇到过同样的问题,但还没有在网上找到解决方案。我想出的解决方案看起来很愚蠢,但很有效。

import asyncio

# websocket endpoint / RX thread

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    ws_tx_thread = Thread(target=websocket_tx_task_wrapper, args = (websocket, to_ws_queue,))
    ws_tx_thread.start()

    while True:
        data = await websocket.receive_text()
        print(f"WS RX: {data}")

# tx task; sends data from queue

async def websocket_tx_task(ws, _q):
    print("Starting WS tx...")

    while True:            
        if(_q.empty() == False):
            cmd = _q.get()
            await ws.send_text(f"Msg: {cmd}")

    print("WS tx thread terminated")

def websocket_tx_task_wrapper(ws, _q):
    asyncio.run(websocket_tx_task(ws, _q))

RestAPI和WS通常可以共存。

相关问题