python asyncio如何读取StdIn并写入StdOut?

python asyncio how to read StdIn and write to StdOut?

我需要异步读取 StdIn 以获取消息(json 被 \r\n 终止)并在处理异步后将更新的消息写入 StdOut。

目前我正在同步进行,例如:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

如何异步执行相同的操作?

这是一个使用 asyncio streamsstdin 回显到 stdout 的示例(对于 Unix)。

import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())

作为 ready-to-use 解决方案,您可以使用 aioconsole 库。它实现了类似的方法,但也为 inputprintexeccode.interact:

提供了额外有用的异步等价物
from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

更新:

让我们试着弄清楚函数 connect_stdin_stdout 是如何工作的。

  1. 获取当前事件循环:
loop = asyncio.get_event_loop()
  1. 创建 StreamReader 个实例。
reader = asyncio.StreamReader()

通常,StreamReader/StreamWriter classes 不打算直接实例化,只能用作 open_connection()start_server() 等函数的结果。 StreamReader 为某些数据流提供缓冲的异步接口。一些源码(库代码)调用它的函数如feed_datafeed_eof,数据被缓冲,可以使用documented接口协程read()readline()读取],等等

  1. 创建 StreamReaderProtocol 个实例。
protocol = asyncio.StreamReaderProtocol(reader)

这个class是从asyncio.ProtocolFlowControlMixin派生出来的,有助于适应ProtocolStreamReader。它覆盖 Protocol 方法,如 data_receivedeof_received 并调用 StreamReader 方法 feed_data.

  1. 在事件循环中注册标准输入流stdin
await loop.connect_read_pipe(lambda: protocol, sys.stdin)

connect_read_pipe 函数将 pipe 对象作为 file-like 参数。 stdin 是一个 file-like 对象。从现在开始,所有从stdin读取的数据都会落入StreamReaderProtocol,然后传入StreamReader

  1. 在事件循环中注册标准输出流stdout
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

connect_write_pipe 中,您需要传递一个协议工厂,该工厂创建协议实例,为 StreamWriter.drain() 实现流量控制逻辑。这个逻辑在classFlowControlMixin中实现。也StreamReaderProtocol继承自它。

  1. 创建 StreamWriter 个实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

此 class 将使用函数 write()writelines() 等传递给它的数据转发到基础 transport.

protocol用于支持drain()函数等待底层传输刷新其内部缓冲区并可再次写入的时刻。

reader是一个可选参数,可以是None,它也用来支持drain()功能,在这个功能开始时检查是否有异常设置为 reader,例如,由于连接丢失(与套接字和双向连接相关),那么 drain() 也会抛出异常。

您可以在这个很棒的 .

中阅读有关 StreamWriterdrain() 函数的更多信息

更新 2:

要读取带有 \r\n 分隔符的行,可以使用 readuntil

这是从标准输入异步读取的另一种方式(一次读取一行)。

async def async_read_stdin()->str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, sys.stdin.readline)