异步 IO - 从输入块输出中读取字符

Async IO - reading char from input blocks output

注意:此示例是在 linux 终端模拟器上测试的,由于使用了 termios(我不知道它是否跨平台),它可能无法正常运行其他操作系统的终端。


我一直在尝试制作“异步”python 提示。我的意思是,当用户根据提示输入内容时,他们还可以接收消息,而无需取消输入。

下面是使用 asyncio.Queue 和一些 termios 标志的实现(对于复杂性提前抱歉,我尽量减少它):

import sys, termios, os
import asyncio

def readline(prompt: str = "Input: "):
    # termios stuff to: disable automatic echo so that, when a character is typed, it is not immediately printed on screen
    #                   read a single character from stdin without pressing <Enter> to finish
    fd = sys.stdin.fileno()
    orig_termios = termios.tcgetattr(fd)
    new_termios = termios.tcgetattr(fd)
    new_termios[3] &= ~(termios.ICANON | termios.ECHO)

    # set to new termios
    termios.tcsetattr(fd, termios.TCSADRAIN, new_termios)

    async def terminput(queue: asyncio.Queue):
        """Get terminal input and send it to the queue."""
        while True:
            ch = sys.stdin.read(1) # read a single char (works because of the termios config)

            if ch == "\n":
                await queue.put(("finish", None)) # `None` here because we won't use the second argument
                await asyncio.sleep(0) # strange workaround so the queues actually work
                continue

            await queue.put(("input", ch))
            await asyncio.sleep(0) # strange workaround so the queues actually work

    async def timedsender(queue: asyncio.Queue):
        """Every 0.5 seconds, send a message to the queue."""
        while True:
            await queue.put(("message", "I'm a message!"))
            await asyncio.sleep(0.5)
    
    async def receiver(queue: asyncio.Queue):
        """Handle the receiving of messages and input characters."""
        # Late decision that I might be able to fix easily - I had to use a list to push characters into on a earlier version of the code. It can be a string now, though.
        input_buffer = []

        sys.stdout.write(prompt)
        sys.stdout.flush()

        def clear_line():
            """Clear the current line.

            There might be an escape code that does this already. Eh, anyways...
            """
            sys.stdout.write("\r")
            sys.stdout.write(" " * os.get_terminal_size().columns)
            sys.stdout.write("\r")
            sys.stdout.flush()

        def redraw_input_buffer():
            """Redraw the input buffer.

            Shows the prompt and what has been typed until now.
            """
            sys.stdout.write(prompt + "".join(input_buffer))
            sys.stdout.flush()

        while True:
            # So, lemme explain what this format is.
            # Each item sent on the queue should be a tuple.
            # The first element is what should be done with the content (such as show message, add to input buffer), and the second element is the content itself.
            kind, content = await queue.get()

            if kind == "message":
                clear_line()
                sys.stdout.write(f"Message -- {content}\n")
                sys.stdout.flush()
                redraw_input_buffer()
            elif kind == "input":
                sys.stdout.write(content)
                sys.stdout.flush()
                input_buffer += content
            elif kind == "finish":
                sys.stdout.write("\n")

                sys.stdout.write(f"INPUT FINISHED :: {repr(''.join(input_buffer))}\n")
                sys.stdout.flush()
                
                input_buffer.clear()
                redraw_input_buffer()
                # continue reading more input lines...
            else:
                raise ValueError(f"Unknown kind: {repr(kind)}")

            queue.task_done()
    
    async def main():
        queue = asyncio.Queue()
    
        senders = [terminput(queue), timedsender(queue)]
        recv = receiver(queue)
        await asyncio.gather(*senders, recv)
    
        await queue.join()
        recv.cancel()

    try:
        asyncio.run(main())
    finally:
        # reset to original termios
        termios.tcsetattr(fd, termios.TCSADRAIN, orig_termios)

readline()

这里的主要问题是只有在输入字符时才会读取队列,即便如此,如果我没有等待足够的时间来读取下一个字符,比如 asyncio.sleep(0.1) , 在此期间通常只会收到一条消息。

我不确定问题是队列还是 stdin-stdout 机制的某些内部工作机制(也许我无法在 stdin 被阻塞时写入 stdout)。

刚刚想出了解决这个问题的方法 - 设置输入字符的最长等待时间。

readline()的顶部:

def readline(prompt: str = "Input: "):
    fd = sys.stdin.fileno()
    orig_termios = termios.tcgetattr(fd)
    new_termios = termios.tcgetattr(fd)
    new_termios[3] &= ~(termios.ICANON | termios.ECHO)
    
    # the following lines were added:
    new_termios[6][termios.VMIN] = 0 # minimal amount of characters to
    new_termios[6][termios.VTIME] = 1 # a max wait time of 1/10 second

当直接在 C 上使用它时,超时返回的字符将是代码 170,但这里似乎甚至没有发生(来自 Python 的读取操作可能已经忽略它们) .