websocket python:在pytest fixture中运行async例程的正确方法?

sqserrrh  于 2023-06-23  发布在  Python
关注(0)|答案(2)|浏览(135)

bounty还有4天到期。回答此问题可获得+200声望奖励。Vince希望引起更多关注这个问题。

下面的测试通过了,但我怀疑我是否正确使用了asyncio:

  • 代码混合了asyncio和线程
  • 测试正在通过,但从未退出(可能是因为“loop.run_until_complete”从未结束)
import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(websocket_server())
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    # no idea how to stop the loop here
    thread.join()

@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

(note:为了停止循环,我尝试了这里提出的解决方案(How to stop websocket server created with websockets.serve()?),但这导致服务器无法启动)

kgqe7b3p

kgqe7b3p1#

在运行服务器的线程中需要一些其他代码来接收来自主线程的信号并自行关闭。
幸运的是,由于asyncio的特性,这个控件可以构建在一个单独的函数中,而不会干扰实现服务器本身的函数。只有创建循环并调用服务器任务的函数必须安排一些代码来检查来自另一个线程的信号到达,在另一个任务中- asyncio将负责两个任务轮流运行。
跨线程通信的正确方法是使用队列-尽管在这种情况下,即使是模块级(全局)变量也可以工作。请注意,即使存在“异步队列”--在本例中,我们希望将消息从一个线程发送到另一个线程,并且没有两个异步任务试图并行读取它,因此我们在queue模块中使用“传统”多线程Queue类。
同样,不相关,但我将启动asyncio循环的代码更改为新的方式,使用asyncio.run,没有第一个Python版本中所需的所有样板。

import asyncio
import threading
import pytest
import websockets
from queue import Queue, Empty

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

async def coordinate(q):
    server = asyncio.create_task(websocket_server())
    while True:
        await asyncio.sleep(0)  # this is necessary to allow the asyncio loop to switch tasks.
        try:
            q.get_nowait()
        except Empty:
            pass
        else:  # block will run whenever there is _any_ message in the queue.
            server.cancel()
            return
    server.cancel()

def _run_server(q):
    asyncio.run(coordinate(q))

@pytest.fixture
def run_server():
    command  = Queue()
    thread = threading.Thread(target=_run_server, args=(command,))
    thread.start()
    yield thread
    command.put("quit")
    thread.join()

@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

第二种方法不需要服务器线程中的消息监视代码,只需从运行测试的线程调用取消服务器任务。Asyncio在调用loop.call_soon_threadsafe时对此有一个预见--我们只需要在原始线程中引用循环和服务器任务(这样我们就可以得到它的.cancel方法)--这可以通过模块级(全局)变量来完成。“run_server”函数不会返回,因此需要全局变量,因为它们的值可以在设置后立即在父线程中检查。否则,如果由于它们的全局状态而不想求助于它们,那么线程队列也可以用于将“loop”和“server”对象从子线程发布到fixture代码。使用全局变量会妨碍测试正确地并行运行。

import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    global loop, server
    loop = asyncio.new_event_loop()
    server = loop.create_task(websocket_server())
    try:
        loop.run_until_complete(server)
    except asyncio.CancelledError:
        pass
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    loop.call_soon_threadsafe(server.cancel)
    thread.join()

@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

这一次我们需要一个对asyncio循环对象本身的显式引用,所以我们不调用asyncio.run,而是执行“create_loop”,“run_until_complete”调用。
(感谢您提供完整的,独立的,可执行的,最小的例子-没有它,我就不会花时间在这个问题上)

rjjhvcjd

rjjhvcjd2#

解决这个问题的一个简单方法是强制线程在超时后终止。

import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(websocket_server())
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    # no idea how to stop the loop here
    thread.join(timeout=3) # <--- set timeout for thread so it doesn't run forever.

@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

不是非常优雅,但如果服务器正在运行,测试将永远不会终止,直到夹具终止

相关问题