bounty还有4天到期。回答此问题可获得+200声望奖励。Vince希望引起更多关注这个问题。
下面的测试通过了,但我怀疑我是否正确使用了asyncio:
- 代码混合了asyncio和线程
- 测试正在通过,但从未退出(可能是因为“loop.run_until_complete”从未结束)
import asyncio
import threading
import pytest
import websockets
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def websocket_server():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
def _run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(websocket_server())
loop.close()
@pytest.fixture
def run_server():
thread = threading.Thread(target=_run_server)
thread.start()
yield thread
# no idea how to stop the loop here
thread.join()
@pytest.mark.asyncio
async def test_websocket(run_server):
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello!")
response = await websocket.recv()
assert response == "Hello!"
(note:为了停止循环,我尝试了这里提出的解决方案(How to stop websocket server created with websockets.serve()?),但这导致服务器无法启动)
2条答案
按热度按时间kgqe7b3p1#
在运行服务器的线程中需要一些其他代码来接收来自主线程的信号并自行关闭。
幸运的是,由于asyncio的特性,这个控件可以构建在一个单独的函数中,而不会干扰实现服务器本身的函数。只有创建循环并调用服务器任务的函数必须安排一些代码来检查来自另一个线程的信号到达,在另一个任务中- asyncio将负责两个任务轮流运行。
跨线程通信的正确方法是使用队列-尽管在这种情况下,即使是模块级(全局)变量也可以工作。请注意,即使存在“异步队列”--在本例中,我们希望将消息从一个线程发送到另一个线程,并且没有两个异步任务试图并行读取它,因此我们在
queue
模块中使用“传统”多线程Queue
类。同样,不相关,但我将启动asyncio循环的代码更改为新的方式,使用
asyncio.run
,没有第一个Python版本中所需的所有样板。第二种方法不需要服务器线程中的消息监视代码,只需从运行测试的线程调用取消服务器任务。Asyncio在调用
loop.call_soon_threadsafe
时对此有一个预见--我们只需要在原始线程中引用循环和服务器任务(这样我们就可以得到它的.cancel
方法)--这可以通过模块级(全局)变量来完成。“run_server”函数不会返回,因此需要全局变量,因为它们的值可以在设置后立即在父线程中检查。否则,如果由于它们的全局状态而不想求助于它们,那么线程队列也可以用于将“loop”和“server”对象从子线程发布到fixture代码。使用全局变量会妨碍测试正确地并行运行。这一次我们需要一个对asyncio循环对象本身的显式引用,所以我们不调用
asyncio.run
,而是执行“create_loop”,“run_until_complete”调用。(感谢您提供完整的,独立的,可执行的,最小的例子-没有它,我就不会花时间在这个问题上)
rjjhvcjd2#
解决这个问题的一个简单方法是强制线程在超时后终止。
不是非常优雅,但如果服务器正在运行,测试将永远不会终止,直到夹具终止