我有一些与WebSocket连接通信的代码。我想向服务器发送一条消息,并返回一个future,一旦服务器发送结束信号,它将与服务器发送的数据一起解析。收集响应的代码应该在后台运行,以非阻塞的方式,收集响应,直到发送结束信号。这应该会触发未来来解决。其中的多个应该能够并发运行。
我使用的代码如下:
import asyncio
import websocket
from pyee.base import EventEmitter
import json
def done_callback(future):
try:
result = future.result()
except Exception as exc:
raise
class WebSocketResponder(EventEmitter):
def __init__(self):
super().__init__()
return
async def on_response(self, response):
# Notify the callee that a payload has been received.
print("reading response: ", response)
if response != "end":
print("got non end response: ", response)
self.emit("data", response)
else:
print("emitting end")
self.emit("end")
return
class Processor:
def __init__(self, wsApiUrl) -> None:
self.wsApiUrl = wsApiUrl
# websocket.enableTrace(True) # uncomment for detailed trace of websocket communications
self.websocket = websocket.WebSocket()
self.websocket.connect(self.wsApiUrl)
async def on_message():
while True:
response = self.websocket.recv()
await self.process(response)
loop = asyncio.get_running_loop()
self.receiving_coroutine = asyncio.run_coroutine_threadsafe(on_message(), loop)
self.receiving_coroutine.add_done_callback(done_callback) # adds callback to task which raises exceptions occurring in coroutine
self.responders = {}
self.responder_count = 0
return
async def process(self, response):
loaded_response = json.loads(response)
id = loaded_response["id"]
msg = loaded_response["msg"]
responder = self.responders[id]
await responder.on_response(msg)
return
async def send(self, msg):
future = asyncio.Future()
response = ""
responder = WebSocketResponder()
self.responders[self.responder_count] = responder
self.websocket.send(json.dumps({"id": self.responder_count, "msg": msg}))
self.responder_count += 1 # increment responder count
@responder.on("data")
def data_handler(payload):
nonlocal response, future
print("adding to response: ", payload)
response += payload
response += "\n"
@responder.on("end")
def end_handler():
print("end handler triggered")
nonlocal response, future
print("setting result: ", response)
future.set_result(response)
return future
async def doTheThing():
wsApiUrl = "ws://127.0.0.1:7890"
processor = Processor(wsApiUrl)
result = await processor.send("Hello, Server")
return await result
async def main():
result = await doTheThing()
print("result: ", result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
运行响应消息的WebSocket服务器的代码如下:
import websockets
import asyncio
import json
# Server data
PORT = 7890
print("Server listening on Port " + str(PORT))
# A set of connected ws clients
connected = set()
# The main behavior function for this server
async def echo(websocket, path):
print("A client just connected")
# Store a copy of the connected client
connected.add(websocket)
# Handle incoming messages
try:
async for message in websocket:
print("Received message from client: " + message)
loaded_message = json.loads(message)
id = loaded_message["id"]
msg_received = loaded_message["msg"]
# Send a response to all connected clients except sender
for conn in connected:
if conn != websocket:
print("responding to another websocket")
await conn.send(json.dumps({"id": id, "msg": f"Someone said: {msg_received}"}))
else:
print("responding to sender")
await conn.send(json.dumps({"id": id, "msg": f"Thanks for your message: {msg_received}"}))
print("sending more")
await conn.send(json.dumps({"id": id, "msg": "Do you get this?"}))
print("sending end")
await conn.send(json.dumps({"id": id, "msg": "end"}))
print("end sent")
# Handle disconnecting clients
except websockets.exceptions.ConnectionClosed as e:
print("A client just disconnected")
finally:
connected.remove(websocket)
# Start the server
start_server = websockets.serve(echo, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
这是一个更复杂的系统的简化模型,WebSocket服务器是一个独立的服务,由我自己外部的第三方实现。
我创建了一个事件发射器类的示例,用于对来自WebSocket连接的响应做出React,然后发送一条消息,消息中包含此示例的id。
有一个couroutine / task,它运行一个应该永远运行的异步函数,它从WebSocket接收消息并运行一个异步函数来处理消息。这个处理函数检索事件发射器对象,并在事件发射器示例上运行响应函数来处理响应。这个事件发射器发射数据事件,它触发在send message调用中示例化的回调。并且一旦服务器发送结束信号,事件发射器就发出结束事件,该结束事件触发回调以设置由发送调用创建的未来的结果。
这段代码按预期运行,除了当end_handler被触发时,它会打印预期的消息,但future没有解析,await调用只是挂起。我希望这个问题与set_result函数在另一个线程/事件循环中运行有关,但我一直无法解决这个问题,尽管尝试了各种方法。
1条答案
按热度按时间1cklez4t1#
解决方案是删除手动循环分配,并将任务的创建移动到WebSocketResponder类中,因此每个响应者都有自己的任务运行自己的on_message函数,并且还确保on_message函数完成并在某个时候返回,一旦不再需要接收消息。
下面的代码按预期工作,接收并聚合来自服务器的响应,然后,一旦它从服务器接收到结束消息,它就会发出一个end事件并将responder对象上的complete标志设置为true。这会导致on_message函数返回,这允许receiving_task完成。end_handler然后运行并设置未来的结果。这使得doTheThing函数返回来自未来的结果。