如何编写具有固定输入行的 Python 终端应用程序?

How to write a Python terminal application with a fixed input line?

我正在尝试编写一个终端应用程序以通过 pyserial 与 Arduino 微控制器进行交互。以下功能很重要:

原则上,cmd 应该可以做到这一点。但是当用户开始输入时,我正在努力打印收到的消息。

为简单起见,我编写了以下测试脚本,每秒模拟传入的消息。传出的消息只是回显到带有前缀“>”的命令行:

#!/usr/bin/env python3
from cmd import Cmd
from threading import Thread
import time

class Prompt(Cmd):

    def default(self, inp):

        print('>', inp)

stop = False

def echo():
    
    while not stop:
        
        print(time.time())
        time.sleep(1)

thread = Thread(target=echo)
thread.daemon = True
thread.start()

try:
    Prompt().cmdloop()
except KeyboardInterrupt:
    stop = True
    thread.join()

在 Spyder IDE 中,结果非常完美:

但是在 iterm2 (Mac OS) 中输出非常混乱:

因为我想在 Visual Studio 代码中使用这个应用程序,所以它应该在 Spyder 之外工作。 你知道如何在 iterm2 中获得与 Spyder 中相同的行为吗?

我已经考虑或尝试过的事情:

我已经有一段时间没有用我的 Mac 与 Arduino 互动了。我使用 pyserial 并且它是 100% 可靠的。密钥是用户 read_until()。我已经包含了我的包装器 class 以供说明。 (当我没有 Arduino 时也有模拟模式)

import serial # pip install PySerial
from serial.tools import list_ports
import pty, os # for creating virtual serial interface
from serial import Serial
from typing import Optional


class SerialInterface:
    # define constants which control how class works
    FULLEMULATION=0
    SERIALEMULATION=1
    URLEMULATION=2
    FULLSOLUTION=3

    # define private class level variables
    __emulate:int = FULLEMULATION
    __ser:Serial
    __port:str = ""


    def __init__(self, emulate:int=FULLEMULATION, port:str="") -> None:
        self.__buffer:list = []
        self.__emulate = emulate
        self.__port = port

        #self.listports()

        # setup connection to COM/serial port
        # emulation sets up a virtual port,  but this has not been working
        if emulate == self.FULLSOLUTION:
            self.__ser = serial.Serial(port, 9600)
        elif emulate == self.SERIALEMULATION:
            master, slave = pty.openpty()
            serialport = os.ttyname(slave)

            self.__ser = serial.Serial(port=serialport, baudrate=9600, timeout=1)
        elif emulate == self.URLEMULATION:
            self.__ser = serial.serial_for_url("loop://")


    # useful to show COM/serial ports on a computer
    @staticmethod
    def listports() -> list:
        for p in serial.tools.list_ports.comports():
            print(p, p.device)
            serialport = p.device
        return serial.tools.list_ports.comports()

    def read_until(self, expected:bytes=b'\n', size:Optional[int]=None) -> bytes:
        if self.__emulate == self.FULLEMULATION:
            return self.__buffer.pop()
        else:
            return self.__ser.read_until(expected, size)

    # note it is important to have \n on end of every write to allow data to be read item by item
    def write(self, bytes:bytes=b'') -> None:
        if self.__emulate == self.FULLEMULATION:
            self.__buffer.append(bytes)
        else:
            self.__ser.write(bytes)

    def dataAvail(self) -> bool:
        if self.__emulate == self.FULLEMULATION:
            return len(self.__buffer) > 0
        else:
            return self.__ser.inWaiting() > 0

    def close(self) -> None:
        self.__ser.close()

    def mode(self) -> int:
        return self.__emulate




是的!我找到了一个解决方案:Prompt Toolkit 3.0 in combination with asyncio lets you handle this very problem using patch_stdout,“确保其中的打印语句不会破坏用户界面的上下文管理器”。

这是一个最小的工作示例:

#!/usr/bin/env python3
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
import asyncio
import time

async def echo():

    while True:
        
        print(time.time())
        await asyncio.sleep(1)

async def read():

    session = PromptSession()

    while True:
 
        with patch_stdout():
            line = await session.prompt_async("> ")
            print(line.upper())

loop = asyncio.get_event_loop()
loop.create_task(echo())
loop.create_task(read())
loop.run_forever()