websocket 等待着一个永远无法解决的未来:set_result由事件发射器从后台协程/任务触发的回调调用

zpf6vheq  于 2023-04-21  发布在  其他
关注(0)|答案(1)|浏览(105)

我有一些与WebSocket连接通信的代码。我想向服务器发送一条消息,并返回一个future,一旦服务器发送结束信号,它将与服务器发送的数据一起解析。收集响应的代码应该在后台运行,以非阻塞的方式,收集响应,直到发送结束信号。这应该会触发未来来解决。其中的多个应该能够并发运行。
我使用的代码如下:

import asyncio
import websocket
from pyee.base import EventEmitter
import json

def done_callback(future):
    try:
        result = future.result()
    except Exception as exc:
        raise

class WebSocketResponder(EventEmitter):
    def __init__(self):
        super().__init__()
        return

    async def on_response(self, response):
        # Notify the callee that a payload has been received.
        print("reading response: ", response)
        if response != "end":
            print("got non end response: ", response)
            self.emit("data", response)
        else:
            print("emitting end")
            self.emit("end")
        return

class Processor:
    def __init__(self, wsApiUrl) -> None:
        self.wsApiUrl = wsApiUrl
        # websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
        self.websocket = websocket.WebSocket()
        self.websocket.connect(self.wsApiUrl)
        async def on_message():
            while True:
                response = self.websocket.recv()
                await self.process(response)

        loop = asyncio.get_running_loop()
        self.receiving_coroutine = asyncio.run_coroutine_threadsafe(on_message(), loop)
        self.receiving_coroutine.add_done_callback(done_callback) # adds callback to task which raises exceptions occurring in coroutine
        self.responders = {}
        self.responder_count = 0
        return
    
    async def process(self, response):
        loaded_response = json.loads(response)
        id = loaded_response["id"]
        msg = loaded_response["msg"]
        responder = self.responders[id]
        await responder.on_response(msg)
        return

    async def send(self, msg):
        future = asyncio.Future()
        response = ""

        responder = WebSocketResponder()
        self.responders[self.responder_count] = responder

        self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
        
        self.responder_count += 1 # increment responder count
        @responder.on("data")
        def data_handler(payload):
            nonlocal response, future
            print("adding to response: ", payload)
            response += payload
            response += "\n"

        @responder.on("end")
        def end_handler():
            print("end handler triggered")
            nonlocal response, future
            print("setting result: ", response)
            future.set_result(response)

        return future

async def doTheThing():
    wsApiUrl = "ws://127.0.0.1:7890"

    processor = Processor(wsApiUrl)

    result = await processor.send("Hello, Server")
    return await result

async def main():
    result = await doTheThing()
    print("result: ", result)

if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

运行响应消息的WebSocket服务器的代码如下:

import websockets
import asyncio
import json

# Server data
PORT = 7890
print("Server listening on Port " + str(PORT))

# A set of connected ws clients
connected = set()

# The main behavior function for this server
async def echo(websocket, path):
    print("A client just connected")
    # Store a copy of the connected client
    connected.add(websocket)
    # Handle incoming messages
    try:
        async for message in websocket:
            print("Received message from client: " + message)
            loaded_message = json.loads(message)
            id = loaded_message["id"]
            msg_received = loaded_message["msg"]
            # Send a response to all connected clients except sender
            for conn in connected:
                if conn != websocket:
                    print("responding to another websocket")
                    await conn.send(json.dumps({"id": id, "msg": f"Someone said: {msg_received}"}))
                else:
                    print("responding to sender")
                    await conn.send(json.dumps({"id": id, "msg": f"Thanks for your message: {msg_received}"}))
                    print("sending more")
                    await conn.send(json.dumps({"id": id, "msg": "Do you get this?"}))
                    print("sending end")
                    await conn.send(json.dumps({"id": id, "msg": "end"}))
                    print("end sent")
    # Handle disconnecting clients 
    except websockets.exceptions.ConnectionClosed as e:
        print("A client just disconnected")
    finally:
        connected.remove(websocket)

# Start the server
start_server = websockets.serve(echo, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

这是一个更复杂的系统的简化模型,WebSocket服务器是一个独立的服务,由我自己外部的第三方实现。
我创建了一个事件发射器类的示例,用于对来自WebSocket连接的响应做出React,然后发送一条消息,消息中包含此示例的id。
有一个couroutine / task,它运行一个应该永远运行的异步函数,它从WebSocket接收消息并运行一个异步函数来处理消息。这个处理函数检索事件发射器对象,并在事件发射器示例上运行响应函数来处理响应。这个事件发射器发射数据事件,它触发在send message调用中示例化的回调。并且一旦服务器发送结束信号,事件发射器就发出结束事件,该结束事件触发回调以设置由发送调用创建的未来的结果。
这段代码按预期运行,除了当end_handler被触发时,它会打印预期的消息,但future没有解析,await调用只是挂起。我希望这个问题与set_result函数在另一个线程/事件循环中运行有关,但我一直无法解决这个问题,尽管尝试了各种方法。

1cklez4t

1cklez4t1#

解决方案是删除手动循环分配,并将任务的创建移动到WebSocketResponder类中,因此每个响应者都有自己的任务运行自己的on_message函数,并且还确保on_message函数完成并在某个时候返回,一旦不再需要接收消息。
下面的代码按预期工作,接收并聚合来自服务器的响应,然后,一旦它从服务器接收到结束消息,它就会发出一个end事件并将responder对象上的complete标志设置为true。这会导致on_message函数返回,这允许receiving_task完成。end_handler然后运行并设置未来的结果。这使得doTheThing函数返回来自未来的结果。

import asyncio
import websocket
from pyee.base import EventEmitter
import json


class WebSocketResponder(EventEmitter):
    def __init__(self, id, websocket, process):
        super().__init__()

        self.id = id
        self.websocket = websocket
        self.process = process
        self.complete = False
        self.receiving_task = asyncio.create_task(self.on_message(), name=f"on_message for responder {self.id}")
        self.receiving_task.add_done_callback(self.done_callback) # adds callback to task which raises exceptions occurring in coroutine
        return
    
    def done_callback(self, future):
        try:
            result = future.result()
        except Exception as exc:
            raise

    async def on_message(self):
        while not self.complete:
            response = self.websocket.recv()
            await self.process(response)
        return

    async def on_response(self, response):
        # Notify the callee that a payload has been received.
        print("reading response: ", response)
        if response != "end":
            print("got non end response: ", response)
            self.emit("data", response)
        else:
            print("emitting end")
            self.emit("end")
            self.complete = True # set complete flag so self.receiving_task may complete
        return

class Processor:
    def __init__(self, wsApiUrl) -> None:
        self.wsApiUrl = wsApiUrl
        # websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
        self.websocket = websocket.WebSocket()
        self.websocket.connect(self.wsApiUrl)
        self.responders = {}
        self.responder_count = 0
        return
    
    async def process(self, response):
        loaded_response = json.loads(response)
        id = loaded_response["id"]
        msg = loaded_response["msg"]
        responder = self.responders[id]
        await responder.on_response(msg)
        return

    async def send(self, msg):
        response = ""

        responder = WebSocketResponder(self.responder_count, self.websocket, self.process)
        future = asyncio.Future()
        self.responders[self.responder_count] = responder

        self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
        
        self.responder_count += 1 # increment responder count
        @responder.on("data")
        def data_handler(payload):
            nonlocal response, future
            print("adding to response: ", payload)
            response += payload
            response += "\n"

        @responder.on("end")
        def end_handler():
            print("end handler triggered")
            nonlocal response, future
            print("setting result: ", response)
            future.set_result(response)

        return future

async def doTheThing():
    wsApiUrl = "ws://127.0.0.1:7890"

    processor = Processor(wsApiUrl)

    result = await processor.send("Hello, Server")
    return await result

async def main():
    result = await doTheThing()
    print("result: ", result)

if __name__ ==  '__main__':
    asyncio.run(main())

相关问题