python asyncio如何读取StdIn并写入StdOut?

mm9b1k5b  于 2023-06-20  发布在  Python
关注(0)|答案(2)|浏览(150)

我需要异步读取StdIn以获取消息(json由\r\n终止),并在处理异步后将更新的消息写入StdOut。
现在我正在同步地做:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

如何异步地做同样的事情?

izkcnapc

izkcnapc1#

下面是使用asyncio streams(对于Unix)将stdin回显到stdout的示例。

import asyncio
import sys

async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer

async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)
        await writer.drain()

if __name__ == "__main__":
    asyncio.run(main())

作为一个现成的解决方案,您可以使用aioconsole库。它实现了类似的方法,但也提供了inputprintexeccode.interact的其他有用的异步等效方法:

from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

更新:

让我们试着弄清楚函数connect_stdin_stdout是如何工作的。
1.获取当前事件循环:

loop = asyncio.get_event_loop()

1.创建StreamReader示例。

reader = asyncio.StreamReader()

通常,StreamReader/StreamWriter类不打算直接示例化,而应该仅作为open_connection()start_server()等函数的结果使用。StreamReader为某些数据流提供缓冲异步接口。一些源代码(库代码)调用其函数,如feed_datafeed_eof,数据被缓冲,可以使用文档中的接口协程read()readline()等读取。
1.创建StreamReaderProtocol示例。

protocol = asyncio.StreamReaderProtocol(reader)

这个类是从asyncio.ProtocolFlowControlMixin派生而来的,有助于在ProtocolStreamReader之间进行适配。它覆盖Protocol方法,如data_receivedeof_received,并调用StreamReader方法feed_data
1.在事件循环中注册标准输入流stdin

await loop.connect_read_pipe(lambda: protocol, sys.stdin)

connect_read_pipe函数接受一个类似文件的对象作为pipe参数。stdin是类似文件的对象。从现在开始,从stdin读取的所有数据将落入StreamReaderProtocol,然后传递到StreamReader
1.在事件循环中注册标准输出流stdout

w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

connect_write_pipe中,您需要传递一个协议工厂,该工厂创建协议示例,为StreamWriter.drain()实现流控制逻辑。这个逻辑在FlowControlMixin类中实现。StreamReaderProtocol也继承了它。
1.创建StreamWriter示例。

writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

该类使用函数write()writelines()等转发传递给它的数据。到底层的transport
protocol用于支持drain()函数,以等待底层传输已刷新其内部缓冲区并可用于再次写入的时刻。
reader是一个可选参数,可以是None,它也用于支持drain()函数,在这个函数的开始,它会检查是否为读取器设置了异常,例如,由于连接丢失(与套接字和双向连接相关),那么drain()也会抛出异常。
您可以在这个伟大的answer中阅读更多关于StreamWriterdrain()函数的信息。

更新二:

要读取带有\r\n分隔符的行,可以使用readuntil

mpgws1up

mpgws1up2#

这是从stdin异步读取的另一种方式(一次读取一行)。

async def async_read_stdin()->str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, sys.stdin.readline)

相关问题