异步 IO - 从输入块输出中读取字符
Async IO - reading char from input blocks output
注意:此示例是在 linux 终端模拟器上测试的,由于使用了 termios
(我不知道它是否跨平台),它可能无法正常运行其他操作系统的终端。
我一直在尝试制作“异步”python 提示。我的意思是,当用户根据提示输入内容时,他们还可以接收消息,而无需取消输入。
下面是使用 asyncio.Queue
和一些 termios
标志的实现(对于复杂性提前抱歉,我尽量减少它):
import sys, termios, os
import asyncio
def readline(prompt: str = "Input: "):
# termios stuff to: disable automatic echo so that, when a character is typed, it is not immediately printed on screen
# read a single character from stdin without pressing <Enter> to finish
fd = sys.stdin.fileno()
orig_termios = termios.tcgetattr(fd)
new_termios = termios.tcgetattr(fd)
new_termios[3] &= ~(termios.ICANON | termios.ECHO)
# set to new termios
termios.tcsetattr(fd, termios.TCSADRAIN, new_termios)
async def terminput(queue: asyncio.Queue):
"""Get terminal input and send it to the queue."""
while True:
ch = sys.stdin.read(1) # read a single char (works because of the termios config)
if ch == "\n":
await queue.put(("finish", None)) # `None` here because we won't use the second argument
await asyncio.sleep(0) # strange workaround so the queues actually work
continue
await queue.put(("input", ch))
await asyncio.sleep(0) # strange workaround so the queues actually work
async def timedsender(queue: asyncio.Queue):
"""Every 0.5 seconds, send a message to the queue."""
while True:
await queue.put(("message", "I'm a message!"))
await asyncio.sleep(0.5)
async def receiver(queue: asyncio.Queue):
"""Handle the receiving of messages and input characters."""
# Late decision that I might be able to fix easily - I had to use a list to push characters into on a earlier version of the code. It can be a string now, though.
input_buffer = []
sys.stdout.write(prompt)
sys.stdout.flush()
def clear_line():
"""Clear the current line.
There might be an escape code that does this already. Eh, anyways...
"""
sys.stdout.write("\r")
sys.stdout.write(" " * os.get_terminal_size().columns)
sys.stdout.write("\r")
sys.stdout.flush()
def redraw_input_buffer():
"""Redraw the input buffer.
Shows the prompt and what has been typed until now.
"""
sys.stdout.write(prompt + "".join(input_buffer))
sys.stdout.flush()
while True:
# So, lemme explain what this format is.
# Each item sent on the queue should be a tuple.
# The first element is what should be done with the content (such as show message, add to input buffer), and the second element is the content itself.
kind, content = await queue.get()
if kind == "message":
clear_line()
sys.stdout.write(f"Message -- {content}\n")
sys.stdout.flush()
redraw_input_buffer()
elif kind == "input":
sys.stdout.write(content)
sys.stdout.flush()
input_buffer += content
elif kind == "finish":
sys.stdout.write("\n")
sys.stdout.write(f"INPUT FINISHED :: {repr(''.join(input_buffer))}\n")
sys.stdout.flush()
input_buffer.clear()
redraw_input_buffer()
# continue reading more input lines...
else:
raise ValueError(f"Unknown kind: {repr(kind)}")
queue.task_done()
async def main():
queue = asyncio.Queue()
senders = [terminput(queue), timedsender(queue)]
recv = receiver(queue)
await asyncio.gather(*senders, recv)
await queue.join()
recv.cancel()
try:
asyncio.run(main())
finally:
# reset to original termios
termios.tcsetattr(fd, termios.TCSADRAIN, orig_termios)
readline()
这里的主要问题是只有在输入字符时才会读取队列,即便如此,如果我没有等待足够的时间来读取下一个字符,比如 asyncio.sleep(0.1)
, 在此期间通常只会收到一条消息。
我不确定问题是队列还是 stdin-stdout 机制的某些内部工作机制(也许我无法在 stdin 被阻塞时写入 stdout)。
刚刚想出了解决这个问题的方法 - 设置输入字符的最长等待时间。
在readline()
的顶部:
def readline(prompt: str = "Input: "):
fd = sys.stdin.fileno()
orig_termios = termios.tcgetattr(fd)
new_termios = termios.tcgetattr(fd)
new_termios[3] &= ~(termios.ICANON | termios.ECHO)
# the following lines were added:
new_termios[6][termios.VMIN] = 0 # minimal amount of characters to
new_termios[6][termios.VTIME] = 1 # a max wait time of 1/10 second
当直接在 C 上使用它时,超时返回的字符将是代码 170
,但这里似乎甚至没有发生(来自 Python 的读取操作可能已经忽略它们) .
注意:此示例是在 linux 终端模拟器上测试的,由于使用了 termios
(我不知道它是否跨平台),它可能无法正常运行其他操作系统的终端。
我一直在尝试制作“异步”python 提示。我的意思是,当用户根据提示输入内容时,他们还可以接收消息,而无需取消输入。
下面是使用 asyncio.Queue
和一些 termios
标志的实现(对于复杂性提前抱歉,我尽量减少它):
import sys, termios, os
import asyncio
def readline(prompt: str = "Input: "):
# termios stuff to: disable automatic echo so that, when a character is typed, it is not immediately printed on screen
# read a single character from stdin without pressing <Enter> to finish
fd = sys.stdin.fileno()
orig_termios = termios.tcgetattr(fd)
new_termios = termios.tcgetattr(fd)
new_termios[3] &= ~(termios.ICANON | termios.ECHO)
# set to new termios
termios.tcsetattr(fd, termios.TCSADRAIN, new_termios)
async def terminput(queue: asyncio.Queue):
"""Get terminal input and send it to the queue."""
while True:
ch = sys.stdin.read(1) # read a single char (works because of the termios config)
if ch == "\n":
await queue.put(("finish", None)) # `None` here because we won't use the second argument
await asyncio.sleep(0) # strange workaround so the queues actually work
continue
await queue.put(("input", ch))
await asyncio.sleep(0) # strange workaround so the queues actually work
async def timedsender(queue: asyncio.Queue):
"""Every 0.5 seconds, send a message to the queue."""
while True:
await queue.put(("message", "I'm a message!"))
await asyncio.sleep(0.5)
async def receiver(queue: asyncio.Queue):
"""Handle the receiving of messages and input characters."""
# Late decision that I might be able to fix easily - I had to use a list to push characters into on a earlier version of the code. It can be a string now, though.
input_buffer = []
sys.stdout.write(prompt)
sys.stdout.flush()
def clear_line():
"""Clear the current line.
There might be an escape code that does this already. Eh, anyways...
"""
sys.stdout.write("\r")
sys.stdout.write(" " * os.get_terminal_size().columns)
sys.stdout.write("\r")
sys.stdout.flush()
def redraw_input_buffer():
"""Redraw the input buffer.
Shows the prompt and what has been typed until now.
"""
sys.stdout.write(prompt + "".join(input_buffer))
sys.stdout.flush()
while True:
# So, lemme explain what this format is.
# Each item sent on the queue should be a tuple.
# The first element is what should be done with the content (such as show message, add to input buffer), and the second element is the content itself.
kind, content = await queue.get()
if kind == "message":
clear_line()
sys.stdout.write(f"Message -- {content}\n")
sys.stdout.flush()
redraw_input_buffer()
elif kind == "input":
sys.stdout.write(content)
sys.stdout.flush()
input_buffer += content
elif kind == "finish":
sys.stdout.write("\n")
sys.stdout.write(f"INPUT FINISHED :: {repr(''.join(input_buffer))}\n")
sys.stdout.flush()
input_buffer.clear()
redraw_input_buffer()
# continue reading more input lines...
else:
raise ValueError(f"Unknown kind: {repr(kind)}")
queue.task_done()
async def main():
queue = asyncio.Queue()
senders = [terminput(queue), timedsender(queue)]
recv = receiver(queue)
await asyncio.gather(*senders, recv)
await queue.join()
recv.cancel()
try:
asyncio.run(main())
finally:
# reset to original termios
termios.tcsetattr(fd, termios.TCSADRAIN, orig_termios)
readline()
这里的主要问题是只有在输入字符时才会读取队列,即便如此,如果我没有等待足够的时间来读取下一个字符,比如 asyncio.sleep(0.1)
, 在此期间通常只会收到一条消息。
我不确定问题是队列还是 stdin-stdout 机制的某些内部工作机制(也许我无法在 stdin 被阻塞时写入 stdout)。
刚刚想出了解决这个问题的方法 - 设置输入字符的最长等待时间。
在readline()
的顶部:
def readline(prompt: str = "Input: "):
fd = sys.stdin.fileno()
orig_termios = termios.tcgetattr(fd)
new_termios = termios.tcgetattr(fd)
new_termios[3] &= ~(termios.ICANON | termios.ECHO)
# the following lines were added:
new_termios[6][termios.VMIN] = 0 # minimal amount of characters to
new_termios[6][termios.VTIME] = 1 # a max wait time of 1/10 second
当直接在 C 上使用它时,超时返回的字符将是代码 170
,但这里似乎甚至没有发生(来自 Python 的读取操作可能已经忽略它们) .