websocket 为什么向Web Socket发送消息并不能控制事件循环?

jhkqcmku  于 2023-03-30  发布在  其他
关注(0)|答案(1)|浏览(182)

请看下面的代码:
main.py

import asyncio
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)
        print(message)

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == '__main__':
    asyncio.run(main())

other.py

import asyncio
import json

import websockets

tasks = set()

async def run_job(i):
    await asyncio.sleep(0.)
    print(f"I'm job number {i}")

async def bunch_of_tasks(ws):
    for i in range(10):
        task = asyncio.create_task(run_job(i), name=f'job-{i}')
        tasks.add(task)
        task.add_done_callback(tasks.discard)
        print(f'had a nice sleep! now my value is {i}')
        # await asyncio.sleep(0.)
        await ws.send(json.dumps('hello there!'))
    await asyncio.gather(*tasks)
    print(f'tasks done')

async def do_stuff():
    async with websockets.connect("ws://localhost:8765") as websocket:
        await bunch_of_tasks(websocket)
        await websocket.recv()

if __name__ == '__main__':
    asyncio.run(do_stuff())

首先运行main.py,然后并行运行other.py,我得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
had a nice sleep! now my value is 2
had a nice sleep! now my value is 3
had a nice sleep! now my value is 4
had a nice sleep! now my value is 5
had a nice sleep! now my value is 6
had a nice sleep! now my value is 7
had a nice sleep! now my value is 8
had a nice sleep! now my value is 9
I'm job number 0
I'm job number 1
I'm job number 2
I'm job number 3
I'm job number 4
I'm job number 5
I'm job number 6
I'm job number 7
I'm job number 8
I'm job number 9
tasks done

但是如果我在await ws.send(json.dumps('hello there!'))之前取消注解await asyncio.sleep(0.),我会得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
I'm job number 0
had a nice sleep! now my value is 2
I'm job number 1
had a nice sleep! now my value is 3
I'm job number 2
had a nice sleep! now my value is 4
I'm job number 3
had a nice sleep! now my value is 5
I'm job number 4
had a nice sleep! now my value is 6
I'm job number 5
had a nice sleep! now my value is 7
I'm job number 6
had a nice sleep! now my value is 8
I'm job number 7
had a nice sleep! now my value is 9
I'm job number 8
I'm job number 9
tasks done

这也是我所期望的
所以很明显,发送消息到Web Socket并没有将控制权交给事件循环,并且run_job协程没有机会运行。但是,asyncio.sleep有效地挂起了当前任务,并为run_job提供了执行的机会。
为什么会这样?

arknldoa

arknldoa1#

TL;DR:确实-await本身不足以保证控制权被传递回asyncio循环的协作部分。为了确保这一点,必须在调用链的某个位置调度一个低级回调(asyncio.sleep就是这样做的)。

长回答:
我不得不调试出来--虽然websockets客户端的一切都是异步的,但它归结为一次写入它在Selector套接字中发送的所有数据--因为它是不受约束的。
换句话说,ws.send最终将同步调用这一行:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Lib/asyncio/selector_events.py#L1071
然后,最大的惊喜是对于原始协程(未 Package 在tasks或future中的协同例程),每当它们返回它们的值时,执行不会屈服于asyncio-loop -并且即使它们包含其他“await”语句,如果嵌套的等待的协同例程实际上不会在await中“阻塞”,则asyncio循环永远不会返回。每当协同例程在发送之后实际上“阻塞”时,调度回调:这个回调被 Package 在Python asyncio.events.Handle对象中,然后控制返回到asyncio循环。
代码在这个C函数中:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Modules/_asynciomodule.c#L2696。如果你沿着那里,你可以看到,如果协同例程返回一个值,它的结果是在低级对象中设置的。如果它从send返回另一个awaitable,那一个在循环中被调度,函数返回。
它可能是(并且很可能是)asyncio Loops的实现选择:出于性能原因,实际上同步解析的任何所谓的协同例程被同步运行。
只有当await中调用的任何嵌套的协同例程结束了在叶调用中使用回调调度未来时(asyncio.sleep这样做),asyncio默认循环才会运行所有其他就绪任务(asyncio.base_events.BaseEventLoop中另一个完整执行的._run_once方法)。

相关问题