pythonic永远运行一个pythonio事件循环的方法是什么?

4ngedf3f  于 2023-11-20  发布在  Python
关注(0)|答案(4)|浏览(108)

从文档中看,推荐的kickstart applications的方法似乎是使用asyncio.run(),所以我的应用程序看起来像这样:

async def async_main():
    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)

    # Wait forever, ugly:
    while True:
        await asyncio.sleep(10000)

asyncio.run(async_main())

字符串
async_main()末尾的无限循环感觉非常错误。在其他语言中,我会永远调用事件循环。所以我尝试了这个:

def main():
    loop = asyncio.get_event_loop()

    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)

    # Wait forever, pretty:
    loop.run_forever()

main()


这里的问题是,当我在函数内部调用asyncio.create_task()时,这将失败,并出现RuntimeError: no running event loop错误,即使事件循环已经创建并注册在线程上。
pythonic是什么,一种在asyncio事件循环中永远休眠的方法?

c8ib6hqw

c8ib6hqw1#

您可以简单地将sleep循环更改为从不设置的ad-hoc事件:

# wait forever
await asyncio.Event().wait()

字符串
如果需要,您可以轻松地修改它,将事件存储到变量中,并将其作为关闭信号传播。
另一种选择是让创建任务的函数 * 返回 * 它创建的任务,在这种情况下,即使(或者正是因为)它们永远不会完成,你也可以等待它们:

async def async_main():
    o = ObjectThatMustBeKeptReferenced()
    tasks = create_tasks_and_register_callbacks(o)
    # wait forever, or until a task raises
    await asyncio.gather(*tasks)


虽然这并没有清楚地传达永远循环的意图,但它的优点是,如果任何任务引发未处理的异常,它将暂停程序(并将异常传播到调用asyncio.run(async_main())的代码)。

ct3nt3jp

ct3nt3jp2#

Infinite while True对于基于asyncio的脚本来说是相当pythonic的。它代表程序的 main 循环。这将脚本逻辑从低级 * 事件循环 * 中分离出来,并允许您从应用程序逻辑的Angular 控制程序的生命周期。执行一些周期性的操作和检查。
这样的循环可以放在其他模块和类中,这意味着长期的周期性工作。这与asyncio是一致的,因为它使用 * 协作多任务 *,这样的循环给予控制event loop(例如通过await asyncio.sleep)。
asyncio.run更可取,因为它仍然归结为run_forever的调用,但在退出脚本的主协程时,它会取消并清除正在运行的任务、异步生成器等。
无限循环的真示例子:
Uvicorn

#...
    async def main_loop(self):
        counter = 0
        should_exit = await self.on_tick(counter)
        while not should_exit:
            counter += 1
            counter = counter % 864000
            await asyncio.sleep(0.1)
            should_exit = await self.on_tick(counter)

字符串
aiohttp

#...
        # sleep forever by 1 hour intervals,
        # on Windows before Python 3.8 wake up every 1 second to handle
        # Ctrl+C smoothly
        if sys.platform == "win32" and sys.version_info < (3, 8):
            delay = 1
        else:
            delay = 3600

        while True:
            await asyncio.sleep(delay)
    finally:
        await runner.cleanup()

rqdpfwrv

rqdpfwrv3#

我过去做这件事的方法是使用asyncio.async,这是asyncip.ensure_future的一个不推荐的别名。人们面临的最大问题是,他们会在没有运行循环的情况下调用ensure_future,这不能确保任何事情,所以越高-添加了一个级别的API asyncio.create_task,以Assert存在一个正在运行的循环,以便任务实际运行。task然后永远运行循环,这样ensure_future的底层API就是您想要的:

async def async_main():
    # Everything here can use asyncio.create_task():
    o = ObjectThatMustBeKeptReferenced()
    create_tasks_and_register_callbacks(o)
    
asyncio.ensure_future(async_main())
asyncio.get_event_loop().run_forever()

字符串
这基本上表明,当循环运行时,我们确保async_main将执行,但与asyncio.run不同的是,我们让循环永远运行,而不仅仅是直到它完成主入口点。
但是你可以做得更好,一个给定的任务是否会永远运行并不重要,理想情况下,你应该跟踪它们,并在所有这些任务上使用你的函数await,如果其中一些任务永远运行,那么你的函数也会永远运行。但是如果没有,那么一旦所有的BRAC任务都退出了,程序也会退出。create_task已运行,pass them all to an asyncio.gather as @user4815162342已完成。
类不必公开它们的派生任务来完成这一点,只需提供一个方法来等待它们,这是一个示例来说明这个想法:

class NetworkAsyncHandler():
    def __init__(self, some_data):
        # don't just create floating tasks, keep a record of them
        self._floating_coroutines = []
        self.remembered_data = some_data
        self.connect_to_some_server_idk_this_is_is_an_example("localhost")
    def connect_to_some_server_idk_this_is_is_an_example(self, host):
        # ALWAYS KEEP A REFERENCE TO THE FUTURES YOU CREATE
        a = asyncio.create_task(DO_STUFF(host, self.remembered_data))
        # assuming you were never explicitly awaiting on a yourself, then just add it to the list
        self._floating_coroutines.append(a)

    async def join(self):
        """awaits on all floating coroutines created"""
        await asyncio.gather(*self._floating_coroutines)

async def async_main():
    a = NetworkAsyncHandler("A")
    b = NetworkAsyncHandler("B")
    # this function will wait on all created subroutines and return only when all
    # sub routines end.  If some run forever then so do we.
    await asyncio.gather(a.join(), b.join())
    
# now this works exactly as you would want it to.
asyncio.run(async_main())


你可以想象,通常这个NetworkAsyncHandler可能永远运行,但可能有一个信号,它可以从服务器接收到关闭,这导致它返回,如果两个服务器都收到这个信号,你会希望Python程序完成。第一种情况下,你调用run_forever这不会发生,实际上你甚至不需要知道例程是否还在运行,但是在这个架构中,每个函数都在等待它实际依赖的例程,并且asyncio.run完全按照它的意图去做。

uttx8gqw

uttx8gqw4#

pythonic是什么,一种永远睡在pythcio事件循环上的方法?
asyncio.run是一个更高级别的API,通常是运行事件循环的首选方式。不过,使用较低级别的run_forever也没有什么错。
这里的问题是,当我调用asyncio.create_task()时,这将失败,并出现RuntimeError: no running event loop错误
由于create_task无法获取正在运行的事件循环,因此这不起作用。幸运的是,还有一个关于循环的create_task方法。您需要更新create_tasks_and_register_callbacks以接受循环作为参数

def create_tasks_and_register_callbacks(obj, loop):

字符串
然后将其定义中对asyncio.create_task的任何引用更改为loop.create_task

相关问题