python-3.x 为什么__aexit__在内部有await时没有完全执行?

brccelvz  于 2023-04-08  发布在  Python
关注(0)|答案(4)|浏览(197)

这是我的代码的简化版本:
main是在第二次迭代后停止的协程。
get_numbers是一个异步生成器,它生成数字,但在异步上下文管理器中。

import asyncio

class MyContextManager:
    async def __aenter__(self):
        print("Enter to the Context Manager...")
        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        print(exc_type)
        print("Exit from the Context Manager...")
        await asyncio.sleep(1)
        print("This line is not executed")  # <-------------------
        await asyncio.sleep(1)

async def get_numbers():
    async with MyContextManager():
        for i in range(30):
            yield i

async def main():
    async for i in get_numbers():
        print(i)
        if i == 1:
            break

asyncio.run(main())

输出为:

Enter to the Context Manager...
0
1
<class 'asyncio.exceptions.CancelledError'>
Exit from the Context Manager...

我其实有两个问题:
1.根据我的理解,AsyncIO在事件循环的下一个循环中安排了一个Task,并给__aexit__一个执行的机会。但是print("This line is not executed")这一行没有执行。为什么呢?假设如果我们在__aexit__中有一个await语句,这行之后的代码根本不会执行,我们不应该依靠它来清理?
1.异步生成器上的help()的输出显示:

|  aclose(...)
 |      aclose() -> raise GeneratorExit inside generator.

那么为什么我在__aexit__中得到<class 'asyncio.exceptions.CancelledError'>异常呢?

  • 使用Python 3.10.4
fhg3lkii

fhg3lkii1#

这并不是针对__aexit__,而是针对所有异步代码:当一个事件循环关闭时,它必须在 * 取消 * 剩余任务或 * 保留 * 它们之间做出决定。为了清理,大多数框架更喜欢取消而不是依赖于程序员稍后清理保留的任务。
这种关闭清理是一种独立的机制,与正常执行期间调用堆栈上函数、上下文等的优雅展开不同。* 在取消期间也必须清理的上下文管理器必须专门为此做好准备 *。尽管如此,在许多情况下,不为此做准备也是可以的,因为许多资源本身是故障安全的。
在当代的事件循环框架中,通常有三个级别的清理:

  • 展开:当作用域结束时,__aexit__被调用,并且可能会收到一个触发展开的异常作为参数。清理预计会根据需要延迟很长时间。这与运行同步代码的__exit__相当。
  • 取消:__aexit__可能会接收CancelledError 1作为参数 * 或任何await/async for/async with * 上的异常。清理可能会延迟此操作,但预计会尽可能快地进行。这相当于KeyboardInterrupt取消同步代码。
  • 关闭:__aexit__可能会接收GeneratorExit作为参数 * 或作为任何await/async for/async with * 上的异常。清理必须尽快进行。这与GeneratorExit关闭同步发电机类似。

要处理取消/关闭,任何async代码(无论是在__aexit__中还是在其他地方)都必须处理CancelledErrorGeneratorExit。虽然前者可能会被延迟或抑制,但后者应该立即同步处理2。

async def __aexit__(self, exc_type, exc_value, exc_tb):
        print("Exit from the Context Manager...")
        try:
            await asyncio.sleep(1)  # an exception may arrive here
        except GeneratorExit:
            print("Exit stage left NOW")
            raise
        except asyncio.CancelledError:
            print("Got cancelled, just cleaning up a few things...")
            await asyncio.sleep(0.5)
            raise
        else:
            print("Nothing to see here, taking my time on the way out")
            await asyncio.sleep(1)
  • 注意 *:通常不可能彻底处理这些情况。不同形式的清理可能会相互中断,例如取消展开然后关闭。处理清理只能在尽力而为的基础上进行;通过故障安全(例如经由事务)而不是显式清除来实现鲁棒清除。

具体来说,异步生成器的清理是一个棘手的问题,因为它们可以一次被所有情况清理:当生成器完成时展开,当拥有任务被销毁时取消,或者当生成器被垃圾收集时关闭。* 清除信号到达的顺序取决于实现。*
解决这个问题的正确方法不是首先依赖于隐式清理。相反,每个协程都应该确保在父进程退出之前关闭其所有子进程资源。值得注意的是,异步生成器可能持有资源并需要关闭。

async def main():
    # create a generator that might need cleanup
    async_iter = get_numbers()
    async for i in async_iter:
        print(i)
        if i == 1:
            break
    # wait for generator clean up before exiting
    await async_iter.aclose()

在最近的版本中,此模式通过aclosing上下文管理器进行编码。

from contextlib import aclosing

async def main():
    # create a generator and prepare for its cleanup
    async with aclosing(get_numbers()) as async_iter:
        async for i in async_iter:
            print(i)
            if i == 1:
                break

1此例外的名称和/或标识可能会有所不同。
2虽然在GeneratorExit期间可以await异步操作,但它们可能不会屈服于事件循环。同步接口有利于强制执行这一点。

balp4ylt

balp4ylt2#

我不知道发生了什么,但张贴我发现的情况下,它证明是有用的其他人决定调查.当我们存储引用get_numbers()外部main()的输出变化为预期.我会说,它似乎get_numbers()是垃圾收集到早期,但禁用gc没有帮助,所以我的猜测可能是错误的.

import asyncio

test = None

class MyContextManager:
    async def __aenter__(self):
        print("Enter to the Context Manager...")
        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        print(exc_type)
        print("Exit from the Context Manager...")
        await asyncio.sleep(1)
        print("This line is not executed")  # <-- Executed now
        await asyncio.sleep(1)

async def get_numbers():
    async with MyContextManager():
        for i in range(30):
            yield i

async def main():
    global test
    test = get_numbers()

    async for i in test:
        print(i)
        if i == 1:
            break

asyncio.run(main())
zpgglvta

zpgglvta3#

答案很简单解释器将在一秒后继续执行__aexit__,但是main函数完成并且没有指向上下文管理器的指针。
你提到的第一个明显的解决方案是在main函数之后等待足够长的时间:

async def main():
    async for i in get_numbers():
        print(i)
        if i == 1:
            break
    await asyncio.sleep(4)   #  <---- New

另一种方法是使用try/finally:

async def __aexit__(self, exc_type, exc_value, exc_tb):
        try:
            pass
            print(exc_type)
            print("Exit from the Context Manager...")
            await asyncio.sleep(1)
        finally:
            print("This line is not executed")  # <-------------------
r7s23pms

r7s23pms4#

回答第一个问题:
假设如果我们在__aexit__中有一个await语句,那么该行之后的代码根本不会执行,这是正确的吗?
我会说不,并不总是这样。只要main有足够的时间,并且可以再次将控制权传递回事件循环,__aexit__内部的代码就可以执行。我试过这样做:

async def main():
    async for i in get_numbers():
        print(i)
        if i == 1:
            break
    await asyncio.sleep(4)   #  <---- New

.run()只关心传递给它的协程并将其运行到最后,而不关心包括__aexit__在内的其他协程。因此,如果它没有足够的时间或没有将控制权传递给事件循环,我不能依赖第一个await语句之后的下一行。
可能有帮助的其他信息:
在base_events.py/run_forever方法(由.run()调用)中,我发现self._asyncgen_finalizer_hook被传递给sys.set_asyncgen_hooks_asyncgen_finalizer_hook的主体是:

def _asyncgen_finalizer_hook(self, agen):
        self._asyncgens.discard(agen)
        if not self.is_closed():
            self.call_soon_threadsafe(self.create_task, agen.aclose())

但是call_soon_threadsafe的实现是空的。
稍后我将整理这个答案并删除这些猜测。

相关问题