Python asyncio.create_task()-真的需要保留引用吗?

zyfwsgd6  于 2023-08-08  发布在  Python
关注(0)|答案(3)|浏览(111)

asyncio.create_task()的文档指出了以下警告:

重要:保存对该函数结果的引用,以避免任务在执行过程中消失。(资料来源)

我的问题是:这是真的吗?
我有几个IO绑定的“fire and forget”任务,我想使用asyncio并发运行,方法是使用asyncio.create_task()将它们提交到事件循环。然而,我并不真正关心协程的返回值,甚至它们是否成功运行,只是它们最终会运行。一个用例是将数据从“昂贵的”计算写回Redis数据库。如果Redis可用,那就太好了。如果没有,哦,好吧,没有伤害。这就是为什么我不想/不需要await这些任务。
这里是一个通用示例:

import asyncio

async def fire_and_forget_coro():
    """Some random coroutine waiting for IO to complete."""
    print('in fire_and_forget_coro()')
    await asyncio.sleep(1.0)
    print('fire_and_forget_coro() done')

async def async_main():
    """Main entry point of asyncio application."""
    print('in async_main()')
    n = 3
    for _ in range(n):
        # create_task() does not block, returns immediately.
        # Note: We do NOT save a reference to the submitted task here!
        asyncio.create_task(fire_and_forget_coro(), name='fire_and_forget_coro')

    print('awaiting sleep in async_main()')
    await asyncio.sleep(2.0) # <-- note this line
    print('sleeping done in async_main()')

    print('async_main() done.')

    # all references of tasks we *might* have go out of scope when returning from this coroutine!
    return

if __name__ == '__main__':
    asyncio.run(async_main())

字符串
输出量:

in async_main()
awaiting sleep in async_main()
in fire_and_forget_coro()
in fire_and_forget_coro()
in fire_and_forget_coro()
fire_and_forget_coro() done
fire_and_forget_coro() done
fire_and_forget_coro() done
sleeping done in async_main()
async_main() done.


当注解掉await asyncio.sleep()行时,我们永远看不到fire_and_forget_coro()完成。这是预期的:当以asyncio.run()开始的事件循环关闭时,任务将不再执行。但似乎只要事件循环仍在运行,所有任务都将得到处理,即使我从未显式创建对它们的引用。这对我来说似乎是合乎逻辑的,因为事件循环本身 * 必须 * 引用所有计划的任务才能运行它们。我们甚至可以使用asyncio.all_tasks()来获取它们!
所以,我认为我可以相信Python至少有一个强引用到每个计划任务,只要它提交的事件循环仍然在运行,因此我不必自己管理引用。但我想听听别人的意见。我是对的吗?还是有我还没有认识到的陷阱?
如果我是对的,为什么在文档中明确警告?如果你不保留对它的引用,那么它就是垃圾收集,这是Python的一个常见做法。是否存在没有运行事件循环但仍有一些任务对象要引用的情况?也许是在手动创建事件循环时(从未这样做过)?

lb3vh1jj

lb3vh1jj1#

在github上的cpython bug tracker上有一个关于这个主题的公开问题,我刚刚发现:https://github.com/python/cpython/issues/88831
报价:
asyncio只保留对活动任务的弱引用(在_all_tasks中)。如果用户没有保持对任务的引用并且任务当前没有执行或休眠,则用户可以得到“任务已被销毁,但它正在挂起!“.
所以我的问题的答案很不幸是肯定的。人们必须保持对计划任务的引用。
然而,github问题也描述了一个相对简单的解决方法:将所有正在运行的任务保存在set()中,并为任务添加一个回调,该回调将再次从set()中删除。

running_tasks = set()
# [...]
task = asyncio.create_task(some_background_function())
running_tasks.add(task)
task.add_done_callback(lambda t: running_tasks.remove(t))

字符串

nue99wik

nue99wik2#

在python3.11中,有一个新的API asyncio.TaskGroup.create_task
它做the other answer提到的事情,所以你不需要自己做。

hrirmatl

hrirmatl3#

当我意识到越来越多的人(包括我)正在努力理解“* 为什么 *”他们需要保留对任务的引用,因为他们的代码已经运行得很好了,我打算解释一下幕后发生了什么,并在不同的步骤中给予更多关于引用的信息,并显示他们的代码何时工作,何时不工作。
如果你沿着,我的翻译版本是3.11。
让我们从创建任务开始。当我们将一个协程传递给asyncio.create_task()函数时,它会创建一个Task对象(此处):

...
            task = tasks.Task(coro, loop=self, name=name, context=context)
...

字符串
Task的初始化器中,我们有这样一行(在这里):

...
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)
...


第二行将任务注册到WeakSet()(对任务的弱引用)中。引用计数机制不考虑这一点。
你看到那个self.__step了吗?这是一个回调。它实际上是一个引用了新创建的Task对象的方法。我们将其传递给call_soon()方法,该方法调度回调(通过Handle对象)的执行。(此处):

...
        handle = events.Handle(callback, args, self, context)
        ...
        self._ready.append(handle)
...


self._ready是什么?它是事件循环从其中获取回调的最后一个队列:(此处):

...
            handle = self._ready.popleft()
            ...
            else:
                handle._run()
...


因此,到目前为止,在回调从队列中弹出之前,任务的引用关系如下:

eventloop -> self._ready -> Handle -> callback -> task object


有一个对我们的任务对象的strong引用,它阻止了对它的垃圾回收。
到目前为止一切顺利。当事件循环的一个周期运行时会发生什么?self._ready中不再有回调,唯一的强引用现在消失了吗?在这儿等着...
通常,我们在任务中等待一个可等待的对象--一个Future对象,它大多数时候是从IO调用返回的。当我们的回调运行时,它会生成一个Future对象:(此处)

...
                result = coro.send(None)
...


而asyncio接收Future并将回调添加到它的“done callbacks"中:(此处):

result.add_done_callback(
                            self.__wakeup, context=self._context)


同样,回调/方法(self.__weakeup)具有对Task对象的引用。下面是这篇文章中最重要的部分:
产生的Future对象具有对Task对象的指涉。但它能自己存活下来吗?对Future对象本身的引用是什么?只要它有强引用,我们的Task就可以毫无问题地生存,否则我们的任务也会被垃圾收集。
我将展示三个场景来了解它的实际应用。

1.未来对象 * 无法 * 生存:

假设我们在Task中创建了Future对象:

import asyncio
import gc

async def coro1():
    while True:
        print("just printing...")
        await asyncio.sleep(1)
        gc.collect()

async def coro2():
    loop = asyncio.get_running_loop()
    f = loop.create_future()
    print("inside coro2 - going to wait for future")
    await f
    print("inside coro2 - future resolved")

async def main():
    t1 = asyncio.create_task(coro1()) # This task has a reference.
    asyncio.create_task(coro2())      # This task doesn't.
    await asyncio.sleep(5)

asyncio.run(main())


产出:

just printing...
inside coro2 - going to wait for future
Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<coro2() done, defined at ...> wait_for=<Future pending cb=[Task.task_wakeup()]>>
just printing...
just printing...
just printing...
just printing...


发生什么事了?正如我上面提到的,asyncio收到了Future对象,并将self.__wakeup添加到它的“done回调”中,但是对这个Future对象的唯一引用是什么?它只在我们的Task中被引用!在Task和Future对象之间有一个循环引用,并且没有对Task对象的强引用。在调用gc.collect()之后,Python注意到了这个循环引用,并删除了我们的Task。

2.未来的物体 * 可以 * 存活:

我将在coro2()协程中添加一行->使f成为一个全局变量:

async def coro2():
    global f    # <---------------
    loop = asyncio.get_running_loop()
    f = loop.create_future()
    print("inside coro2 - going to wait for future")
    await f
    print("inside coro2 - future resolved")


产出:

just printing...
inside coro2 - going to wait for future
just printing...
just printing...
just printing...
just printing...


现在,“is”是对Future对象的强引用。关系如下:

Global namespace -> Future object -> self._callbacks -> callback -> Task object

3.未来对象 * 可以 * 生存(真实的世界示例):

通常,我们自己不会去创造Future对象。假设我们有一个简单的echo服务器,它异步侦听传入的连接:

import asyncio
import socket
import gc

async def echo(connection, loop):
    while data := await loop.sock_recv(connection, 512):
        gc.collect()
        await loop.sock_sendall(connection, data)

async def listen_for_connections(server_socket, loop):
    while True:
        gc.collect()
        client_socket, client_address = await loop.sock_accept(server_socket)
        client_socket.setblocking(False)
        print(f"received a connection from {client_address}")
        asyncio.create_task(echo(client_socket, loop))  # no reference to this task

async def main():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ("127.0.0.1", 8000)
    server_socket.setblocking(False)
    server_socket.bind(server_address)
    server_socket.listen()

    await listen_for_connections(server_socket, asyncio.get_running_loop())

asyncio.run(main())


现在,我们的echo任务将发生什么变化?
await loop.sock_recv(connection, 512)创建了一个Future对象:(此处)

async def sock_recv(self, sock, n):
    ...
    try:
        return sock.recv(n)
    except (BlockingIOError, InterruptedError):
        pass
    fut = self.create_future()
    fd = sock.fileno()
    handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
    fut.add_done_callback(functools.partial(self._sock_read_done, fd, handle=handle))
    return await fut


你看到那个await fut了吗?它将生成一个Future对象。这和第一种情况相似吗?有没有任何其他关于未来的说法?让我们来看看self._add_reader()的作用:(此处)

def _add_reader(self, fd, callback, *args):
        ...
        handle = events.Handle(callback, args, self, None)
        ...
        self._selector.register(fd, selectors.EVENT_READ,(handle, None))
        ...
        return handle


酷,我们的fut对象存储在args参数中,它作为数据注册到选择器中。
其关系是:

selector -> handle -> args -> fut -> task object.


我试图解释在哪些情况下任务将得到垃圾收集,在哪些情况下它们可以生存,但毕竟,我强烈建议您注意文档中强调的内容:

重要提示:保存对该函数结果的引用,以避免任务在执行过程中消失。事件循环只保留对任务的弱引用。没有在其他地方引用的任务可能会在任何时候被垃圾收集,甚至在它完成之前。

相关问题