我刚发现了一个奇怪的行为,但现在我没办法了。
我的脚本异步迭代一个进程的stdout
流,该进程以一种复杂的方式通过ssh
(asyncssh
)产生。现在 * 有时 * 该流被截断,即。我得到StopAsyncIteration
当我期待更多的来。
现在我找到了罪魁祸首--以下几行是从流中逐行读取的,并带有 separate tasks(为了能够实现超时,但我删除了相应的代码):
async def add_next(generator, collector) -> bool:
with suppress(StopAsyncIteration):
collector.append(await anext(generator))
return True
return False
async def thrtl(generator):
bucket = []
while True:
if not await asyncio.create_task(add_next(generator, bucket)):
break
yield bucket
(请注意,这段代码没有多大意义-它只是复制了我的观察结果)
上面的代码将截断generator
,因为等待下一个元素发生在不同的任务中。如果我只等待add_next
,一切都按预期工作:
async def thrtl(generator):
bucket = []
while True:
if not await add_next(generator, bucket):
break
yield bucket
这个问题并不会发生在每一个可迭代的对象上,例如。使用create_subprocess_exec
创建的流工作正常。如果我连接到一个网速很慢的远程机器,我甚至会得到更多的线路,日志显示
INFO:asyncssh:[conn=0, chan=1] Received exit status 0
INFO:asyncssh:[conn=0, chan=1] Received channel close
INFO:asyncssh:[conn=0, chan=1] Channel closed
就在我拿到StopAsyncIteration
之前因此,在进程终止后从流中阅读似乎是问题的一部分。
我可以接受这一点,并尝试以不同的方式从流中读取,但为什么它在没有创建任务的情况下也能工作呢?
这里是一个完整的脚本供参考-请注意,效果更好地观察与远程连接(而不是连接到'localhost'):
import asyncio, asyncssh, logging
from contextlib import suppress
async def add_next(generator, collector) -> bool:
with suppress(StopAsyncIteration):
collector.append(await anext(generator))
return True
return False
async def thrtl(generator):
bucket = []
while True:
# good:
#if not await add_next(generator, bucket):
# bad:
if not await asyncio.create_task(add_next(generator, bucket)):
break
yield bucket
async def streamhandler(stream):
result = []
async for rawlines in thrtl(aiter(stream)):
result += rawlines
return result
async def main():
ssh_connection = await asyncssh.connect("localhost")
while True:
ssh_process = await ssh_connection.create_process("df -P")
stdout, stderr, completed = await asyncio.gather(
streamhandler(ssh_process.stdout),
streamhandler(ssh_process.stderr),
asyncio.ensure_future(ssh_process.wait()),
)
print(stdout)
print(stderr)
await asyncio.sleep(2)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
1条答案
按热度按时间nfzehxib1#
正如jsbueno所指出的,这种行为可以通过只有一个任务(
asyncio.Queue
上下文)来避免,该任务实际上从generator
读取,并使用asyncio.Queue
访问来自不同任务/上下文的读取项。这种方法的一个警告是异常处理,特别是StopAsyncIteration
,因为从Queue
进行阅读永远不会让您知道生成器已经被耗尽了。仍然希望有一个更直接的方法,这是我是如何做到的:带有类型提示的完整(更有意义但也更冗长)函数现在看起来像这样:
然而,我还没有读到任何地方,我不应该从多个上下文访问一个PDLC生成器。考虑到异常是如何像我一样被走私的,我认为这是一个(设计)错误。
HTH