python-3.x 为什么在单独的任务中逐步执行BRAC迭代的行为与没有任务时不同?

bkkx9g8r  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(126)

我刚发现了一个奇怪的行为,但现在我没办法了。
我的脚本异步迭代一个进程的stdout流,该进程以一种复杂的方式通过sshasyncssh)产生。现在 * 有时 * 该流被截断,即。我得到StopAsyncIteration当我期待更多的来。
现在我找到了罪魁祸首--以下几行是从流中逐行读取的,并带有 separate tasks(为了能够实现超时,但我删除了相应的代码):

async def add_next(generator, collector) -> bool:
    with suppress(StopAsyncIteration):
        collector.append(await anext(generator))
        return True
    return False

async def thrtl(generator):
    bucket = []
    while True:
        if not await asyncio.create_task(add_next(generator, bucket)):
            break
    yield bucket

(请注意,这段代码没有多大意义-它只是复制了我的观察结果)
上面的代码将截断generator,因为等待下一个元素发生在不同的任务中。如果我只等待add_next,一切都按预期工作:

async def thrtl(generator):
    bucket = []
    while True:
        if not await add_next(generator, bucket):
            break
    yield bucket

这个问题并不会发生在每一个可迭代的对象上,例如。使用create_subprocess_exec创建的流工作正常。如果我连接到一个网速很慢的远程机器,我甚至会得到更多的线路,日志显示

INFO:asyncssh:[conn=0, chan=1] Received exit status 0
INFO:asyncssh:[conn=0, chan=1] Received channel close
INFO:asyncssh:[conn=0, chan=1] Channel closed

就在我拿到StopAsyncIteration之前因此,在进程终止后从流中阅读似乎是问题的一部分。
我可以接受这一点,并尝试以不同的方式从流中读取,但为什么它在没有创建任务的情况下也能工作呢?
这里是一个完整的脚本供参考-请注意,效果更好地观察与远程连接(而不是连接到'localhost'):

import asyncio, asyncssh, logging
from contextlib import suppress

async def add_next(generator, collector) -> bool:
    with suppress(StopAsyncIteration):
        collector.append(await anext(generator))
        return True
    return False

async def thrtl(generator):
    bucket = []
    while True:
        # good:
        #if not await add_next(generator, bucket):
        # bad:
        if not await asyncio.create_task(add_next(generator, bucket)):
            break
    yield bucket

async def streamhandler(stream):
    result = []
    async for rawlines in thrtl(aiter(stream)):
        result += rawlines
    return result
    
async def main():
    ssh_connection = await asyncssh.connect("localhost")

    while True:
        ssh_process = await ssh_connection.create_process("df -P")
        stdout, stderr, completed = await asyncio.gather(
            streamhandler(ssh_process.stdout),
            streamhandler(ssh_process.stderr),
            asyncio.ensure_future(ssh_process.wait()),
        )
        print(stdout)
        print(stderr)
        await asyncio.sleep(2)

logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
nfzehxib

nfzehxib1#

正如jsbueno所指出的,这种行为可以通过只有一个任务(asyncio.Queue上下文)来避免,该任务实际上从generator读取,并使用asyncio.Queue访问来自不同任务/上下文的读取项。这种方法的一个警告是异常处理,特别是StopAsyncIteration,因为从Queue进行阅读永远不会让您知道生成器已经被耗尽了。仍然希望有一个更直接的方法,这是我是如何做到的:

async def thrtl(generator):
    async def iterate(generator, queue) -> None:
        while True:
            try:
                queue.put_nowait(await anext(generator))
            except Exception as exc:
                queue.put_nowait(exc)
                break

    async def add_next(queue, collector) -> None:
        elem = await queue.get()
        if isinstance(elem, Exception):
            raise elem
        collector.append(elem)

    bucket = []
    while True:
        try:
            await asyncio.create_task(add_next(generator, bucket)):
        except StopAsyncIteration:
            break

    yield bucket

带有类型提示的完整(更有意义但也更冗长)函数现在看起来像这样:

async def collect_chunks(
    generator: AsyncIterator[T],
    *,
    postpone: bool = False,
    min_interval: float = 2,
    bucket_size: int = 0,
) -> AsyncIterator[Sequence[T]]:
    """Collect elements read from @generator and wait for a given condition before yielding them
    in chunks.
    Condition is met only after @min_interval seconds have passed since
    [1] first element received since last bucket if @postpone is set to False or
    [2] since last received element if @postpone is set to True.
    If @bucket_size > 0 the chunk will be returned immediately regardless of @postpone if the
    number of collected elements has reached @bucket_size.
    """

    async def iterate(generator: AsyncIterator[T], queue: asyncio.Queue[T | Exception]) -> None:
        """Writes elements read from @generator to @queue in order to not access @generator
        from more than one context
        see https://stackoverflow.com/questions/77245398"""
        while True:
            try:
                queue.put_nowait(await anext(generator))
            except Exception as exc:  # pylint: disable=broad-except
                queue.put_nowait(exc)
                break

    async def add_next(queue: asyncio.Queue[T | Exception], collector: MutableSequence[T]) -> None:
        """Reads one element from @queue and puts it into @collector. Together with `iterate`
        this gives us an awaitable read-only-one-element-with-timeout semantic"""
        elem = await queue.get()
        if isinstance(elem, Exception):
            raise elem
        collector.append(elem)

    event_tunnel: asyncio.Queue[T | Exception] = asyncio.Queue()
    collected_events: MutableSequence[T] = []
    fuse_task = None
    tasks = {
        asyncio.create_task(add_next(event_tunnel, collected_events), name="add_next"),
        asyncio.create_task(iterate(generator, event_tunnel), name="iterate"),
    }

    with suppress(asyncio.CancelledError):
        while True:
            finished, tasks = await asyncio.wait(fs=tasks, return_when=asyncio.FIRST_COMPLETED)

            for finished_task in finished:
                if (event_name := finished_task.get_name()) == "add_next":
                    # in case we're postponing we 'reset' the timeout fuse by removing it
                    if postpone and fuse_task:
                        tasks.remove(fuse_task)
                        fuse_task.cancel()
                        with suppress(asyncio.CancelledError):
                            await fuse_task
                        del fuse_task
                        fuse_task = None

                    if (exception := finished_task.exception()) or (
                        bucket_size and len(collected_events) >= bucket_size
                    ):
                        if collected_events:
                            yield collected_events
                            collected_events.clear()
                        if exception:
                            if isinstance(exception, StopAsyncIteration):
                                return
                            raise exception

                    tasks.add(
                        asyncio.create_task(
                            add_next(event_tunnel, collected_events), name="add_next"
                        )
                    )
                elif event_name == "fuse":
                    if collected_events:
                        yield collected_events
                        collected_events.clear()
                    del fuse_task
                    fuse_task = None
                else:
                    assert event_name == "iterate"

            # we've had a new event - start the timeout fuse
            if not fuse_task and min_interval > 0:
                tasks.add(
                    fuse_task := asyncio.create_task(asyncio.sleep(min_interval), name="fuse")
                )

然而,我还没有读到任何地方,我不应该从多个上下文访问一个PDLC生成器。考虑到异常是如何像我一样被走私的,我认为这是一个(设计)错误。
HTH

相关问题