python asyncio如何读取StdIn并写入StdOut?
python asyncio how to read StdIn and write to StdOut?
我需要异步读取 StdIn 以获取消息(json 被 \r\n 终止)并在处理异步后将更新的消息写入 StdOut。
目前我正在同步进行,例如:
class SyncIOStdInOut():
def write(self, payload: str):
sys.stdout.write(payload)
sys.stdout.write('\r\n')
sys.stdout.flush()
def read(self) -> str:
payload=sys.stdin.readline()
return payload
如何异步执行相同的操作?
这是一个使用 asyncio streams 将 stdin
回显到 stdout
的示例(对于 Unix)。
import asyncio
import sys
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(100)
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
作为 ready-to-use 解决方案,您可以使用 aioconsole 库。它实现了类似的方法,但也为 input
、print
、exec
和 code.interact
:
提供了额外有用的异步等价物
from aioconsole import get_standard_streams
async def main():
reader, writer = await get_standard_streams()
更新:
让我们试着弄清楚函数 connect_stdin_stdout
是如何工作的。
- 获取当前事件循环:
loop = asyncio.get_event_loop()
- 创建
StreamReader
个实例。
reader = asyncio.StreamReader()
通常,StreamReader/StreamWriter
classes 不打算直接实例化,只能用作 open_connection()
和 start_server()
等函数的结果。
StreamReader
为某些数据流提供缓冲的异步接口。一些源码(库代码)调用它的函数如feed_data
、feed_eof
,数据被缓冲,可以使用documented接口协程read()
、readline()
读取],等等
- 创建
StreamReaderProtocol
个实例。
protocol = asyncio.StreamReaderProtocol(reader)
这个class是从asyncio.Protocol
和FlowControlMixin
派生出来的,有助于适应Protocol
和StreamReader
。它覆盖 Protocol
方法,如 data_received
、eof_received
并调用 StreamReader
方法 feed_data
.
- 在事件循环中注册标准输入流
stdin
。
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
connect_read_pipe
函数将 pipe
对象作为 file-like 参数。 stdin
是一个 file-like 对象。从现在开始,所有从stdin
读取的数据都会落入StreamReaderProtocol
,然后传入StreamReader
- 在事件循环中注册标准输出流
stdout
。
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
在 connect_write_pipe
中,您需要传递一个协议工厂,该工厂创建协议实例,为 StreamWriter.drain()
实现流量控制逻辑。这个逻辑在classFlowControlMixin
中实现。也StreamReaderProtocol
继承自它。
- 创建
StreamWriter
个实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
此 class 将使用函数 write()
、writelines()
等传递给它的数据转发到基础 transport
.
protocol
用于支持drain()
函数等待底层传输刷新其内部缓冲区并可再次写入的时刻。
reader
是一个可选参数,可以是None
,它也用来支持drain()
功能,在这个功能开始时检查是否有异常设置为 reader,例如,由于连接丢失(与套接字和双向连接相关),那么 drain()
也会抛出异常。
您可以在这个很棒的 .
中阅读有关 StreamWriter
和 drain()
函数的更多信息
更新 2:
要读取带有 \r\n
分隔符的行,可以使用 readuntil
这是从标准输入异步读取的另一种方式(一次读取一行)。
async def async_read_stdin()->str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sys.stdin.readline)
我需要异步读取 StdIn 以获取消息(json 被 \r\n 终止)并在处理异步后将更新的消息写入 StdOut。
目前我正在同步进行,例如:
class SyncIOStdInOut():
def write(self, payload: str):
sys.stdout.write(payload)
sys.stdout.write('\r\n')
sys.stdout.flush()
def read(self) -> str:
payload=sys.stdin.readline()
return payload
如何异步执行相同的操作?
这是一个使用 asyncio streams 将 stdin
回显到 stdout
的示例(对于 Unix)。
import asyncio
import sys
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(100)
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
作为 ready-to-use 解决方案,您可以使用 aioconsole 库。它实现了类似的方法,但也为 input
、print
、exec
和 code.interact
:
from aioconsole import get_standard_streams
async def main():
reader, writer = await get_standard_streams()
更新:
让我们试着弄清楚函数 connect_stdin_stdout
是如何工作的。
- 获取当前事件循环:
loop = asyncio.get_event_loop()
- 创建
StreamReader
个实例。
reader = asyncio.StreamReader()
通常,StreamReader/StreamWriter
classes 不打算直接实例化,只能用作 open_connection()
和 start_server()
等函数的结果。
StreamReader
为某些数据流提供缓冲的异步接口。一些源码(库代码)调用它的函数如feed_data
、feed_eof
,数据被缓冲,可以使用documented接口协程read()
、readline()
读取],等等
- 创建
StreamReaderProtocol
个实例。
protocol = asyncio.StreamReaderProtocol(reader)
这个class是从asyncio.Protocol
和FlowControlMixin
派生出来的,有助于适应Protocol
和StreamReader
。它覆盖 Protocol
方法,如 data_received
、eof_received
并调用 StreamReader
方法 feed_data
.
- 在事件循环中注册标准输入流
stdin
。
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
connect_read_pipe
函数将 pipe
对象作为 file-like 参数。 stdin
是一个 file-like 对象。从现在开始,所有从stdin
读取的数据都会落入StreamReaderProtocol
,然后传入StreamReader
- 在事件循环中注册标准输出流
stdout
。
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
在 connect_write_pipe
中,您需要传递一个协议工厂,该工厂创建协议实例,为 StreamWriter.drain()
实现流量控制逻辑。这个逻辑在classFlowControlMixin
中实现。也StreamReaderProtocol
继承自它。
- 创建
StreamWriter
个实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
此 class 将使用函数 write()
、writelines()
等传递给它的数据转发到基础 transport
.
protocol
用于支持drain()
函数等待底层传输刷新其内部缓冲区并可再次写入的时刻。
reader
是一个可选参数,可以是None
,它也用来支持drain()
功能,在这个功能开始时检查是否有异常设置为 reader,例如,由于连接丢失(与套接字和双向连接相关),那么 drain()
也会抛出异常。
您可以在这个很棒的
StreamWriter
和 drain()
函数的更多信息
更新 2:
要读取带有 \r\n
分隔符的行,可以使用 readuntil
这是从标准输入异步读取的另一种方式(一次读取一行)。
async def async_read_stdin()->str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sys.stdin.readline)