Python异步:在执行其他操作时等待stdin输入

wbgh16ku  于 2023-06-25  发布在  Python
关注(0)|答案(5)|浏览(144)

我正在尝试创建一个WebSocket命令行客户端,它等待来自WebSocket服务器的消息,但同时等待用户输入。
每秒定期轮询多个在线源在服务器上运行良好(在本例中运行在localhost:6789上),但它使用的不是Python的普通sleep()方法,而是asyncio.sleep(),这是有意义的,因为睡眠和异步睡眠不是一回事,至少在后台不是。
类似地,等待用户输入和异步等待用户输入并不是一回事,但是我不知道如何异步等待用户输入,就像我可以异步等待任意秒数一样,这样客户端就可以在等待用户输入的同时处理来自WebSocket服务器的传入消息。
下面monitor_cmd()else-子句中的注解希望能解释我的意思:

import asyncio
import json
import websockets

async def monitor_ws():
    uri = 'ws://localhost:6789'
    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            print(json.dumps(json.loads(message), indent=2, sort_keys=True))

async def monitor_cmd():
    while True:

        sleep_instead = False

        if sleep_instead:
            await asyncio.sleep(1)
            print('Sleeping works fine.')
        else:
            # Seems like I need the equivalent of:
            # line = await asyncio.input('Is this your line? ')
            line = input('Is this your line? ')
            print(line)
try:
    asyncio.get_event_loop().run_until_complete(asyncio.wait([
        monitor_ws(),
        monitor_cmd()
    ]))
except KeyboardInterrupt:
    quit()

这段代码只是无限期地等待输入,在此期间不做任何其他事情,我理解为什么。我不明白的是,怎么解决。:)
当然,如果我以错误的方式思考这个问题,我也很乐意学习如何补救。

nkcskrwz

nkcskrwz1#

大量借鉴aioconsole,如果你不想使用外部库,你可以定义自己的异步输入函数:

async def ainput(string: str) -> str:
    await asyncio.get_event_loop().run_in_executor(
            None, lambda s=string: sys.stdout.write(s+' '))
    return await asyncio.get_event_loop().run_in_executor(
            None, sys.stdin.readline)
u2nhd7ah

u2nhd7ah2#

您可以使用aioconsole第三方包以异步友好的方式与stdin交互:

line = await aioconsole.ainput('Is this your line? ')
l7mqbcuq

l7mqbcuq3#

aioconsole大量借用,有两种处理方法。
1.启动一个新守护进程线程:

import sys
import asyncio
import threading
from concurrent.futures import Future

async def run_as_daemon(func, *args):
    future = Future()
    future.set_running_or_notify_cancel()

    def daemon():
        try:
            result = func(*args)
        except Exception as e:
            future.set_exception(e)
        else:
            future.set_result(result)

    threading.Thread(target=daemon, daemon=True).start()
    return await asyncio.wrap_future(future)

async def main():
    data = await run_as_daemon(sys.stdin.readline)
    print(data)

if __name__ == "__main__":
    asyncio.run(main())

1.使用流读取器:

import sys
import asyncio

async def get_steam_reader(pipe) -> asyncio.StreamReader:
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader(loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, pipe)
    return reader

async def main():
    reader = await get_steam_reader(sys.stdin)
    data = await reader.readline()
    print(data)

if __name__ == "__main__":
    asyncio.run(main())
dzhpxtsq

dzhpxtsq4#

Python 3.9引入了asyncio.to_thread,它可以用来简化mfurseman's answer中的代码:

async def ainput(string: str) -> str:
    await asyncio.to_thread(sys.stdout.write, f'{string} ')
    return await asyncio.to_thread(sys.stdin.readline)

请注意,sys.stdin.readlinereturns the newline character'\n',而input则没有。如果你想让ainput排除换行符,我建议做以下修改:

async def ainput(string: str) -> str:
    await asyncio.to_thread(sys.stdout.write, f'{string} ')
    return (await asyncio.to_thread(sys.stdin.readline)).rstrip('\n')
jslywgbw

jslywgbw5#

1.使用Prompt-Toolkit解决方案:

from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout

async def my_coroutine():
    session = PromptSession()
    while True:
        with patch_stdout():
            result = await session.prompt_async('Say something: ')
        print('You said: %s' % result)
  1. asyncio解决方案:
import asyncio
import sys

async def get_stdin_reader() -> asyncio.StreamReader:
    stream_reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(stream_reader)
    loop = asyncio.get_running_loop()
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    return stream_reader

async def main():
    stdin_reader = await get_stdin_reader()
    while True:
        print('input: ', end='', flush=True)
        line = await stdin_reader.readline()
        print(f'your input: {line.decode()}')

asyncio.run(main())

相关问题