如何从当前在控制台中的 python 中读取一行?

How to read a line from python currently in console?

我想使用一行代码来读取我在控制台中输入的内容,以便与 python 中的 asyncio 模块一起使用。我的代码在从服务器接收到数据时打印数据,然后,我希望它读取我输入的内容并将其保存到变量中。我可以使用非标准模块。

节目信息:

代码:

#!python3.5
#chatroom client

import socket, sys, os, traceback, asyncio
from threading import Lock

DBG = False #More Error printing using full_error function

#async stuf
loop = asyncio.get_event_loop()
lock = Lock()
prev_stdin = ""

#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s

#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())
pause = lambda: None #make it global for below

#Pause system compatability
if os.name[0:5]=='posix':
    def pause():
        os.system('read -n1 -r -p "Press any key to continue . . ." key')
elif os.name[0:2]=='nt':
    def pause():
        os.system("pause")
else:
    def pause():
        input("Press enter to continue . . .")

def empty_stdin():
    output = ""
    while True:
        temp = sys.stdin.buffer.read(1)
        if temp == "":
            break
        else:
            output += temp
    return output

if DBG:
    loop.set_debug(DBG)

async def write():
    loop = asyncio.get_event_loop()
    while True:
        line = await loop.run_in_executor(None, sys.stdin.readline)
        line = prev_stdin + line
        prev_stdin = ""
        for i in transports:
            loop.call_soon(i.write,line.encode('utf-8'))

class Server(asyncio.Protocol):
    def connection_made(self,transport):
        self.transport = transport
        transport.write(b"Connected")
    def data_received(self, data):
        print("\n%s"%data.decode('utf-8'))
        prev_stdin = empty_stdin()
        print(prev_stdin,)



try:
    coro = loop.create_connection(Server, host=target, port=port)
    task = loop.create_task(coro)
    print("hi")
    print(transports)
    server, serverp = loop.run_until_complete(task)
    transports += [server]
    user_input = loop.create_task(write())
    #loop.add_reader(sys.stdin, write)
    loop.run_forever()
    print('end')

except Exception as e:
    if DBG:
        full_error()
    else:
        print(type(e))
        print(e)
    pause()
    sys.exit()

我通过使用 msvcrt 捕捉单个字符和一些代码更改解决了问题(仅 Windows)。

新建Functions/Changes

  1. draw_screen:

    • 使用os.system("cls")
    • 清除屏幕
    • 打印我在打印信息时保存的行(可能会滞后)
    • 用我当前输入的内容打印一个光标 (">> Hello")
  2. 写入:

    • 用msvcrt.getch
    • 替换了sys.stdin.readline
    • 更多变化
  3. 无处不在:

    • 每当我要打印时,而是将其添加到 screen 变量,该变量是一个包含所有内容的数组。然后它调用 draw_screen()
    • 删除了 prev_stdin 东西
    • 删除了一些其他打印语句

最终代码(目前):

#!python3.5
#chatroom client

import sys, os, traceback, asyncio, msvcrt
from threading import Lock

DBG = False #More Error printing using full_error function

screen = [] #holds lines of text that are printed to the screen
line = ""

#async stuf
loop = asyncio.get_event_loop()
lock = Lock()


#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s

#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())

#system compatable functions
if os.name[0:5]=='posix':
    def pause():
        os.system('read -n1 -r -p "Press any key to continue . . ." key')
    def cls():
        os.system("clear")
elif os.name[0:2]=='nt':
    def pause():
        os.system("pause")
    def cls():
        os.system("cls")
else:
    def pause():
        input("Press enter to continue . . .")
    def cls():
        os.system("clear")

def draw_screen():
    cls()
    for i in screen:
        print(i)
    print("> ", line, end="\r") #Draws current line of text being typed

if DBG:
    loop.set_debug(DBG)

async def write():
    global line
    while True:
        temp = await loop.run_in_executor(None, msvcrt.getch)
        temp = temp.decode('utf-8')
        if temp == "\b":
            line = line[0:len(line)-1]
        elif not (temp == "\r" or temp == "\n"):
            line += temp
        else:
            for i in transports:
                loop.call_soon(i.write,line.encode('utf-8'))
            line = ""
        draw_screen()

class Server(asyncio.Protocol):
    def connection_made(self,transport):
        self.transport = transport
        transport.write(b"Connected")
    def data_received(self, data):
        global screen
        screen += ["%s"%data.decode('utf-8')]
        draw_screen()



try:
    coro = loop.create_connection(Server, host=target, port=port)
    task = loop.create_task(coro)
    server, serverp = loop.run_until_complete(task)
    transports += [server]
    user_input = loop.create_task(write())
    loop.run_forever()
    print('end')

except Exception as e:
    if DBG:
        full_error()
    else:
        print(type(e))
        print(e)
    pause()
    sys.exit()