python 如何防止asyncio.Task被取消

5jdjgkvh  于 2023-05-21  发布在  Python
关注(0)|答案(1)|浏览(75)

我正在实现优雅关闭,它需要等待某些任务完成执行,然后才能关闭应用程序。我正在等待关闭处理程序中使用asyncio.gather(*asyncio.Task.all_tasks())的任务。
然而,我遇到的问题是,一旦我杀死应用程序,需要等待的任务就会被取消,因此不会出现在asyncio.Task.get_all()中。如何防止这种情况发生?

myzjeezk

myzjeezk1#

  • 注意:* asyncio.Task.all_tasks()是省略的,将其称为asyncio.all_tasks()

TL;DR Demo代码

每个操作系统类型有不同的解决方案。

  • *nix:通过发送SIGINT终止
  • Windows:按Ctrl+C终止

任务持续时间设置为10秒,因此在任务完成前终止。

纯asyncio(仅限 *nix)

复杂,漫长,重新发明轮子。添加自定义信号处理程序以防止错误传播。
演示生成3个屏蔽和3个未屏蔽的任务-前者运行直到完成,后者被取消。

"""
Task shielding demonstration with pure asyncio, nix only
"""
import asyncio
import signal
import os

# Sets of tasks we shouldn't cancel
REQUIRE_SHIELDING = set()

async def work(n):
    """Some random io intensive work to test shielding"""
    print(f"[{n}] Task start!")
    try:
        await asyncio.sleep(10)

    except asyncio.CancelledError:
        # we shouldn't see following output
        print(f"[{n}] Canceled!")
        return

    print(f"[{n}] Task done!")

def install_handler():

    def handler(sig_name):
        print(f"Received {sig_name}")

        # distinguish what to await and what to cancel. We'll have to await all,
        # but we only have to manually cancel subset of it.
        to_await = asyncio.all_tasks()
        to_cancel = to_await - REQUIRE_SHIELDING

        # cancel tasks that don't require shielding
        for task in to_cancel:
            task.cancel()

        print(f"Cancelling {len(to_cancel)} out of {len(to_await)}")

    loop = asyncio.get_running_loop()

    # install for SIGINT and SIGTERM
    for signal_name in ("SIGINT", "SIGTERM"):
        loop.add_signal_handler(getattr(signal, signal_name), handler, signal_name)

async def main():
    print(f"PID: {os.getpid()}")

    # If main task is done - errored or not - all other tasks are canceled.
    # So we need to shield main task.
    REQUIRE_SHIELDING.add(asyncio.current_task())

    # install handler
    install_handler()

    # spawn tasks that will be shielded
    for n in range(3):
        REQUIRE_SHIELDING.add(asyncio.create_task(work(n)))

    # spawn tasks that won't be shielded, for comparison
    for n in range(3, 6):
        asyncio.create_task(work(n))

    # we'll need to keep main task alive just until tasks are done, excluding self.
    await asyncio.gather(*(REQUIRE_SHIELDING - {asyncio.current_task()}))

asyncio.run(main())
PID: 10778
[0] Task start!
[1] Task start!
[2] Task start!
[3] Task start!
[4] Task start!
[5] Task start!
Received SIGINT
Cancelling 3 out of 7
[3] Canceled!
[5] Canceled!
[4] Canceled!
[0] Task done!
[1] Task done!
[2] Task done!

asyncio + aiorun(所有操作系统)

演示与上面相同的内容。

"""
Task shielding demonstration with asyncio + aiorun, all OS
"""
import asyncio
import os

from aiorun import run, shutdown_waits_for

async def work(n):
    """Some random io intensive work to test shielding"""
    print(f"[{n}] Task start!")
    try:
        await asyncio.sleep(10)

    except asyncio.CancelledError:
        print(f"[{n}] Canceled!")
        return

    print(f"[{n}] Task done!")

async def main():
    print(f"PID: {os.getpid()}")
    child_tasks = []

    # spawn tasks that will be shielded
    child_tasks.extend(
        asyncio.create_task(shutdown_waits_for(work(n))) for n in range(3)
    )

    # spawn tasks without shielding for comparison
    child_tasks.extend(asyncio.create_task(work(n)) for n in range(3))

    # aiorun runs forever by default, even without any coroutines left to run.
    # We'll have to manually stop the loop, but can't use asyncio.all_tasks()
    # check as aiorun's internal tasks included in it run forever.
    # instead, keep child task spawned by main task and await those.
    await asyncio.gather(*child_tasks)
    asyncio.get_running_loop().stop()

run(main())
PID: 26548
[0] Task start!
[1] Task start!
[2] Task start!
[3] Task start!
[4] Task start!
[5] Task start!
Stopping the loop
[4] Canceled!
[5] Canceled!
[3] Canceled!
[1] Task done!
[0] Task done!
[2] Task done!

切换到trio(所有OS)

Ground-up纯Python异步事件循环无回调汤

"""
Task shielding demonstration with trio, all OS
"""
import os

import trio

async def work(n):
    """Some random io intensive work to test shielding"""
    print(f"[{n}] Task start!")
    try:
        await trio.sleep(10)

    except trio.Cancelled:
        print(f"[{n}] Canceled!")
        raise

    print(f"[{n}] Task done!")

async def shielded():
    # opening explicit concurrency context.
    # Every concurrency in trio is explicit, via Nursery that takes care of tasks.
    async with trio.open_nursery() as nursery:

        # shield nursery from cancellation. Now all tasks in this scope is shielded.
        nursery.cancel_scope.shield = True

        # spawn tasks
        for n in range(3):
            nursery.start_soon(work, n)

async def main():
    print(f"PID: {os.getpid()}")

    try:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(shielded)

            for n in range(3, 6):
                nursery.start_soon(work, n)

    except (trio.Cancelled, KeyboardInterrupt):
        # Nursery always make sure all child tasks are done - either canceled or not.
        # This try-except is just here to suppress traceback. Not quite required.
        print("Nursery Cancelled!")

trio.run(main)
PID: 23684
[3] Task start!
[4] Task start!
[5] Task start!
[0] Task start!
[1] Task start!
[2] Task start!
[3] Canceled!
[4] Canceled!
[5] Canceled!
[0] Task done!
[1] Task done!
[2] Task done!
Nursery Cancelled!

下面是对 asyncio 的信号处理程序流的一个稍微深入的漫谈。

纯asyncio的信号处理

花了一整天的时间来挖掘这个问题-跟踪,搜索,阅读源代码,但不能得到一个完整的流程。流量是我的猜测。

无自定义信号处理函数

1.接收信号情报
1.不知何故调用了signal._signal.default_int_handler,引发了KeyboardInterrupt

# signal/_signal.py - probably C code
def default_int_handler(*args, **kwargs): # real signature unknown
    """
    The default handler for SIGINT installed by Python.
    
    It raises KeyboardInterrupt.
    """

1.异常传播,finally块在asyncio.run中运行,调用asyncio.runners._cancel_all_tasks()

# asyncio.runners
def run(main, *, debug=None):
    ...
    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)  # <---- this is called
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.run_until_complete(loop.shutdown_default_executor())
        finally:
            events.set_event_loop(None)
            loop.close()
  1. asyncio.runners._cancel_all_tasks()取消asyncio.all_tasks返回的所有任务
# asyncio/runners.py
def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)  # <---- gets all running tasks
    if not to_cancel:                  # internally list of weakref.WeakSet '_all_tasks'
        return

    for task in to_cancel:  # <---- cancels all of it
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
    ...

在执行结束时,无论成功与否,任何剩余的任务最终都将在步骤4中被取消。
因为asyncio.shieldalso adds shielded tasks to _all_tasks也不会有帮助。
但是,如果我们添加自定义处理程序-事情就有点不同了。

自定义信号处理器

1.我们通过asyncio.add_signal_handler添加了自定义的信号处理器

# asyncio/unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    ...
    def add_signal_handler(self, sig, callback, *args):
        """Add a handler for a signal.  UNIX only.

        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        ...
        handle = events.Handle(callback, args, self, None)
        self._signal_handlers[sig] = handle  # <---- added to sig handler dict
        ...

1.接收信号情报
1.以某种方式调用了事件循环的_handle_signal,从dictionary中获取匹配的信号处理程序,并将其作为回调添加

# asyncio/unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    ...
    def _handle_signal(self, sig):
        """Internal helper that is the actual signal handler."""
        handle = self._signal_handlers.get(sig)  # <---- fetches added handler
        if handle is None:
            return  # Assume it's some race condition.
        if handle._cancelled:
            self.remove_signal_handler(sig)
        else:
            self._add_callback_signalsafe(handle)  # <---- adds as callback
    ...

1.我们的自定义回调称为
现在没有调用默认的信号处理程序,所以KeyboardInterrupt还没有被触发,因此asyncio.runtry-finally块还没有继续到finally。因此没有asyncio.runners._cancel_all_tasks调用。
所有的任务终于幸存了下来!在handler中手动取消不必要的任务,我们就可以开始了。

相关问题