运行事件循环,直到所有任务在python中被阻塞

w6lpcovy  于 2023-02-18  发布在  Python
关注(0)|答案(5)|浏览(161)

我正在写一些长时间运行的协程,它们相互作用。这些协程可以在await上被阻塞,直到外部发生一些事情。我希望能够在单元测试中驱动这些协程。在协程上执行await的常规方法不起作用。因为我希望能够在他们的操作中间拦截一些东西。我也不希望弄乱协程的内部构件,除非有一些通用的/可重用的东西可以做。
理想情况下,我希望 * 运行一个事件循环,直到所有的任务都被阻塞 *。这在事件循环实现中应该很容易判断。一旦所有的任务都被阻塞,事件循环就放弃控制,在那里我可以Assert一些关于协程的状态,并从外部戳它们。然后我可以继续循环,直到它再次被阻塞。这将允许在事件循环中对任务进行确定性模拟。
所需API的最小示例:

import asyncio
from asyncio import Event

# Imagine this is a complicated "main" with many coroutines.
# But event is some external "mockable" event
# that can be used to drive in unit tests
async def wait_on_event(event: Event):
  print("Waiting on event")
  await event.wait()
  print("Done waiting on event")

def test_deterministic():
  loop = asyncio.get_event_loop()
  event = Event()
  task = loop.create_task(wait_on_event(event))
  run_until_blocked_or_complete(loop) # define this magic function
  # Should print "Waiting on event"

  # can make some test assertions here
  event.set()

  run_until_blocked_or_complete(loop)
  # Should print "Done waiting on event"

任何类似的可能性?或者这需要编写一个定制的事件循环只是为了测试?
另外,我目前使用的是Python 3.9(AWS运行时限制),如果在Python 3.9中不能做到这一点,那么什么版本会支持呢?

lqfhib0f

lqfhib0f1#

自从我第一次读到这个问题,这个问题就一直困扰着我,因为它几乎可以用标准的asyncio函数来实现,关键是 Alexandria 的“神奇”is_not_blocked方法,我在下面逐字地给予它(除了把它移到外部缩进级别)。我还使用了他的wait_on_event方法,以及他的test_deterministic_loop函数。我添加了一些额外的测试来展示如何启动和停止其他任务,以及如何逐步驱动事件循环直到完成所有任务。
我没有使用DeterministicLoop类,而是使用了一个函数run_until_blocked,它只进行标准的异步函数调用。

loop.call_soon(loop.stop)
loop.run_forever()

是一种方便的方法,可以将循环推进一个周期,并且asyncio已经提供了一种方法,可以获取在给定事件循环中运行的所有任务,因此不需要单独存储它们。
评论 Alexandria 的“魔法”方法:如果你看一下asyncio.task代码中的注解,“private”变量_fut_waiter被描述为一个重要的不变式。这在未来的版本中几乎不可能改变。所以我认为它在实践中是相当安全的。

import asyncio
from typing import Optional, cast

def _is_not_blocked(task: asyncio.Task):
    # pylint: disable-next=protected-access
    wait_for = cast(Optional[asyncio.Future], task._fut_waiter)  # type: ignore
    if wait_for is None:
        return True
    return wait_for.done()

def run_until_blocked():
    """Runs steps of the event loop until all tasks are blocked."""
    loop = asyncio.get_event_loop()
    # Always run one step.
    loop.call_soon(loop.stop)
    loop.run_forever()
    # Continue running until all tasks are blocked
    while any(_is_not_blocked(task) for task in asyncio.all_tasks(loop)):
        loop.call_soon(loop.stop)
        loop.run_forever()
        
# This coroutine could spawn many others. Keeping it simple here
async def wait_on_event(event: asyncio.Event) -> int:
    print("Waiting")
    await event.wait()
    print("Done")
    return 42

def test_deterministic_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    event = asyncio.Event()
    task = loop.create_task(wait_on_event(event))
    assert not task.done()
    run_until_blocked()
    print("Task done", task.done())
    assert not task.done()
    print("Tasks running", asyncio.all_tasks(loop))
    assert asyncio.all_tasks(loop)
    event.set()
    # You can start and stop tasks
    loop.run_until_complete(asyncio.sleep(2.0))
    run_until_blocked()
    print("Task done", task.done())
    assert task.done()
    print("Tasks running", asyncio.all_tasks(loop))
    assert task.result() == 42
    assert not asyncio.all_tasks(loop)
    # If you create a task you must loop run_until_blocked until
    # the task is done.
    task2 = loop.create_task(asyncio.sleep(2.0))
    assert not task2.done()
    while not task2.done():
        assert asyncio.all_tasks(loop)
        run_until_blocked()
    assert task2.done()
    assert not asyncio.all_tasks(loop)
    
test_deterministic_loop()
1yjd4xko

1yjd4xko2#

是的,你可以通过创建一个定制的事件循环策略和在测试中使用一个模拟事件循环来实现这一点,基本的想法是创建一个循环,它只运行到所有的协程都被阻塞为止,然后将控制权交还给测试代码来执行任何必要的Assert或外部戳,然后继续运行循环,直到所有的东西都被再次阻塞为止,等等。

import asyncio

class DeterministicEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
    loop = super().new_event_loop()
    loop._blocked = set()
    return loop

def get_event_loop(self):
    loop = super().get_event_loop()
    if not hasattr(loop, "_blocked"):
        loop._blocked = set()
    return loop

def _enter_task(self, task):
    super()._enter_task(task)
    if not task._source_traceback:
        task._source_traceback = asyncio.Task.current_task().get_stack()
    task._loop._blocked.add(task)

def _leave_task(self, task):
    super()._leave_task(task)
    task._loop._blocked.discard(task)

def run_until_blocked(self, coro):
    loop = self.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        task = loop.create_task(coro)
        while loop._blocked:
            loop.run_until_complete(asyncio.sleep(0))
    finally:
        task.cancel()
        loop.run_until_complete(task)
        asyncio.set_event_loop(None)

此策略创建一个新的事件循环,该循环具有_blocked集属性,用于跟踪当前被阻止的任务。在循环中计划新任务时,将调用_enter_task方法,并将其添加到_blocked集中。完成或取消任务时,将调用_leave_task方法,并将其从_blocked集中删除。
run_until_blocked方法接受一个协程并运行事件循环,直到所有任务都被阻塞。它使用自定义策略创建一个新的事件循环,在循环上调度协程,然后重复运行循环,直到_blocked集为空。这是您可以执行任何必要Assert或外部插入的点。
以下是此策略的使用示例:

async def wait_on_event(event: asyncio.Event):
print("Waiting on event")
await event.wait()
print("Done waiting on event")

def test_deterministic():
asyncio.set_event_loop_policy(DeterministicEventLoopPolicy())

event = asyncio.Event()
asyncio.get_event_loop().run_until_blocked(wait_on_event(event))
assert not event.is_set()  # assert that the event has not been set yet

event.set()  # set the event
asyncio.get_event_loop().run_until_blocked(wait_on_event(event))
assert event.is_set()  # assert that the event has been set

asyncio.get_event_loop().close()

在这个测试中,我们创建了一个新的Event对象,并将其传递给wait_on_event协程。我们使用run_until_blocked方法运行协程,直到它阻塞event.wait()调用。此时,我们可以执行任何必要的Assert,例如检查事件是否尚未设置。然后,我们设置事件,并再次调用run_until_blocked以恢复协程,直到它完成。
此模式允许在事件循环中对任务进行确定性模拟,并可用于测试阻塞外部事件的协程。
希望这有帮助!

euoag5mw

euoag5mw3#

默认的事件循环只是运行每个“过程”中安排的所有内容。如果您只是loop.call在任务运行后使用“www.example.com _soon”来安排暂停,则应该在所需的时间点调用您:

import asyncio
async def worker(n=1):
    await asyncio.sleep(n)
def event():
    print("blah")
    breakpoint()
    print("bleh")
async def worker(id):
    print(f"starting task {id}")
    await asyncio.sleep(0.1)
    print(f"ending task {id}")
async def main():
    t = []
    for id in (1,2,3):
        t.append(asyncio.create_task(worker(id)))
    loop = asyncio.get_running_loop()
    loop.call_soon(event)
    await asyncio.sleep(0.2)

在REPL上运行这个:

In [8]: asyncio.run(main())
starting task 1
starting task 2
starting task 3
blah
> <ipython-input-3-450374919d79>(4)event()
-> print("bleh")
(Pdb) 
Exception in callback event() at <ipython-input-3-450374919d79>:1
[...]
bdb.BdbQuit
ending task 1
ending task 2
ending task 3
55ooxyrt

55ooxyrt4#

经过一些实验,我想出了一些方法。下面是用法:

# This coroutine could spawn many others. Keeping it simple here
async def wait_on_event(event: asyncio.Event) -> int:
    print("Waiting")
    await event.wait()
    print("Done")
    return 42

def test_deterministic_loop():
    loop = DeterministicLoop()
    event = asyncio.Event()
    task = loop.add_coro(wait_on_event(event))

    assert not task.done()
    loop.step()
    # prints Waiting
    assert not task.done()
    assert not loop.done()

    event.set()
    loop.step()
    # prints Done
    assert task.done()
    assert task.result() == 42
    assert loop.done()

实施:

"""Module for testing facilities. Don't use these in production!"""
import asyncio
from enum import IntEnum
from typing import Any, Optional, TypeVar, cast
from collections.abc import Coroutine, Awaitable

def _get_other_tasks(loop: Optional[asyncio.AbstractEventLoop]) -> set[asyncio.Task]:
    """Get a set of currently scheduled tasks in an event loop that are not the current task"""
    current = asyncio.current_task(loop)
    tasks = asyncio.all_tasks(loop)
    if current is not None:
        tasks.discard(current)
    return tasks


# Works on python 3.9, cannot guarantee on other versions
def _get_unblocked_tasks(tasks: set[asyncio.Task]) -> set[asyncio.Task]:
    """Get the subset of tasks that can make progress. This is the most magic
    function, and is heavily dependent on eventloop implementation and python version"""

    def is_not_blocked(task: asyncio.Task):
        # pylint: disable-next=protected-access
        wait_for = cast(Optional[asyncio.Future], task._fut_waiter)  # type: ignore
        if wait_for is None:
            return True
        return wait_for.done()

    return set(filter(is_not_blocked, tasks))

class TasksState(IntEnum):
    RUNNING = 0
    BLOCKED = 1
    DONE = 2

def _get_tasks_state(
    prev_tasks: set[asyncio.Task], cur_tasks: set[asyncio.Task]
) -> TasksState:
    """Given set of tasks for previous and current pass of the event loop,
    determine the overall state of the tasks. Are the tasks making progress,
    blocked, or done?"""
    if not cur_tasks:
        return TasksState.DONE

    unblocked: set[asyncio.Task] = _get_unblocked_tasks(cur_tasks)
    # check if there are tasks that can make progress
    if unblocked:
        return TasksState.RUNNING

    # if no tasks appear to make progress, check if this and last step the state
    # has been constant
    elif prev_tasks == cur_tasks:
        return TasksState.BLOCKED

    return TasksState.RUNNING

async def _stop_when_blocked():
    """Schedule this task to stop the event loop when all other tasks are
    blocked, or they all complete"""
    prev_tasks: set[asyncio.Task] = set()
    loop = asyncio.get_running_loop()
    while True:
        tasks = _get_other_tasks(loop)
        state = _get_tasks_state(prev_tasks, tasks)
        prev_tasks = tasks

        # stop the event loop if all other tasks cannot make progress
        if state == TasksState.BLOCKED:
            loop.stop()

        # finish this task too, if no other tasks exist
        if state == TasksState.DONE:
            break

        # yield back to the event loop
        await asyncio.sleep(0.0)

    loop.stop()

T = TypeVar("T")

class DeterministicLoop:
    """An event loop for writing deterministic tests."""

    def __init__(self):
        self.loop = asyncio.get_event_loop_policy().new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.stepper_task = self.loop.create_task(_stop_when_blocked())
        self.tasks: list[asyncio.Task] = []

    def add_coro(self, coro: Coroutine[Any, Any, T]) -> asyncio.Task[T]:
        """Add a coroutine to the set of running coroutines, so they can be stepped through"""
        if self.done():
            raise RuntimeError("No point in adding more tasks. All tasks have finished")
        task = self.loop.create_task(coro)
        self.tasks.append(task)
        return task

    def step(self, awaitable: Optional[Awaitable[T]] = None) -> Optional[T]:
        if self.done() or not self.tasks:
            raise RuntimeError(
                "No point in stepping. No tasks to step or all are finished"
            )

        step_future: Optional[asyncio.Future[T]] = None
        if awaitable is not None:
            step_future = asyncio.ensure_future(awaitable, loop=self.loop)

        # stepper_task should halt us if we're blocked or all tasks are done
        self.loop.run_forever()

        if step_future is not None:
            assert (
                step_future.done()
            ), "Can't step the event loop, where the step function itself might get blocked"
            return step_future.result()
        return None

    def done(self) -> bool:
        return self.stepper_task.done()
6jjcrrmo

6jjcrrmo5#

import asyncio

async def wait_on_event(event: asyncio.Event):
    print("Waiting on event")
    await event.wait()
    print("Done waiting on event")
    return 42

class Event(asyncio.Event):
    def __init__(self):
        super().__init__()

        global _events
        try:
            _events.append(self)
        except NameError:
            _events = [self]

def run_until_blocked(loop):
    while True:
        loop.call_soon(loop.stop)
        loop.run_forever()

        global _events
        waiters = sum(len(event._waiters) for event in _events)
        tasks = len(asyncio.all_tasks(loop=loop))
        if waiters == tasks:
            break

loop = asyncio.get_event_loop()
event = Event()
coro = wait_on_event(event)
task = loop.create_task(coro)

run_until_blocked(loop)
event.set()
run_until_blocked(loop)

print(task.result())

相关问题