测试从常规函数调用python协程(async def)

5ktev3wc  于 2022-11-19  发布在  Python
关注(0)|答案(2)|浏览(186)

假设我有一个asyncio协程,它获取一些数据并返回。

async def fetch_data(*args):
  result = await some_io()
  return result

基本上,这个协程是从协程链中调用的,初始协程是通过创建一个任务来运行的。但是,如果为了测试的目的,我只想在运行某个文件时运行一个协程,该怎么办?

if __name__ == '__main__':
  result = await fetch_data(*args)
  print(result)

显然我不能这么做,因为我正在运行并等待非协程函数的协程,所以问题是:有没有什么正确方法可以从协程中获取数据而不调用它的函数?我可以为result创建一些Future对象并等待它,但也许还有其他一些更简单、更清晰的方法?

nue99wik

nue99wik1#

你需要创建一个事件循环来运行你的协程:

import asyncio

async def async_func():
    return "hello"

loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()

print(result)

或作为函数:

def run_coroutine(f, *args, **kwargs):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(f(*args, **kwargs))
    loop.close()
    return result

这样使用:

print(run_coroutine(async_func))

或者:

assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
72qzrwbm

72qzrwbm2#

对于Python 3.7+,可以使用asyncio.run()

import asyncio

async def fetch_data():
  return "sample_data"

asyncio.run(fetch_data())

相关问题