如何从当前在控制台中的 python 中读取一行?
How to read a line from python currently in console?
我想使用一行代码来读取我在控制台中输入的内容,以便与 python 中的 asyncio
模块一起使用。我的代码在从服务器接收到数据时打印数据,然后,我希望它读取我输入的内容并将其保存到变量中。我可以使用非标准模块。
节目信息:
- 当前使用 asyncio 非阻塞读取 stdin loop.run_in_executor
- 程序使用 loop.create_connection 运行,然后使用 loop.run_forever 并将标准输入 reader 添加为任务。
代码:
#!python3.5
#chatroom client
import socket, sys, os, traceback, asyncio
from threading import Lock
DBG = False #More Error printing using full_error function
#async stuf
loop = asyncio.get_event_loop()
lock = Lock()
prev_stdin = ""
#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s
#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())
pause = lambda: None #make it global for below
#Pause system compatability
if os.name[0:5]=='posix':
def pause():
os.system('read -n1 -r -p "Press any key to continue . . ." key')
elif os.name[0:2]=='nt':
def pause():
os.system("pause")
else:
def pause():
input("Press enter to continue . . .")
def empty_stdin():
output = ""
while True:
temp = sys.stdin.buffer.read(1)
if temp == "":
break
else:
output += temp
return output
if DBG:
loop.set_debug(DBG)
async def write():
loop = asyncio.get_event_loop()
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
line = prev_stdin + line
prev_stdin = ""
for i in transports:
loop.call_soon(i.write,line.encode('utf-8'))
class Server(asyncio.Protocol):
def connection_made(self,transport):
self.transport = transport
transport.write(b"Connected")
def data_received(self, data):
print("\n%s"%data.decode('utf-8'))
prev_stdin = empty_stdin()
print(prev_stdin,)
try:
coro = loop.create_connection(Server, host=target, port=port)
task = loop.create_task(coro)
print("hi")
print(transports)
server, serverp = loop.run_until_complete(task)
transports += [server]
user_input = loop.create_task(write())
#loop.add_reader(sys.stdin, write)
loop.run_forever()
print('end')
except Exception as e:
if DBG:
full_error()
else:
print(type(e))
print(e)
pause()
sys.exit()
我通过使用 msvcrt
捕捉单个字符和一些代码更改解决了问题(仅 Windows)。
新建Functions/Changes
draw_screen:
- 使用
os.system("cls")
清除屏幕
- 打印我在打印信息时保存的行(可能会滞后)
- 用我当前输入的内容打印一个光标 (">> Hello")
写入:
- 用msvcrt.getch
替换了sys.stdin.readline
- 更多变化
无处不在:
- 每当我要打印时,而是将其添加到
screen
变量,该变量是一个包含所有内容的数组。然后它调用 draw_screen()
- 删除了 prev_stdin 东西
- 删除了一些其他打印语句
最终代码(目前):
#!python3.5
#chatroom client
import sys, os, traceback, asyncio, msvcrt
from threading import Lock
DBG = False #More Error printing using full_error function
screen = [] #holds lines of text that are printed to the screen
line = ""
#async stuf
loop = asyncio.get_event_loop()
lock = Lock()
#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s
#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())
#system compatable functions
if os.name[0:5]=='posix':
def pause():
os.system('read -n1 -r -p "Press any key to continue . . ." key')
def cls():
os.system("clear")
elif os.name[0:2]=='nt':
def pause():
os.system("pause")
def cls():
os.system("cls")
else:
def pause():
input("Press enter to continue . . .")
def cls():
os.system("clear")
def draw_screen():
cls()
for i in screen:
print(i)
print("> ", line, end="\r") #Draws current line of text being typed
if DBG:
loop.set_debug(DBG)
async def write():
global line
while True:
temp = await loop.run_in_executor(None, msvcrt.getch)
temp = temp.decode('utf-8')
if temp == "\b":
line = line[0:len(line)-1]
elif not (temp == "\r" or temp == "\n"):
line += temp
else:
for i in transports:
loop.call_soon(i.write,line.encode('utf-8'))
line = ""
draw_screen()
class Server(asyncio.Protocol):
def connection_made(self,transport):
self.transport = transport
transport.write(b"Connected")
def data_received(self, data):
global screen
screen += ["%s"%data.decode('utf-8')]
draw_screen()
try:
coro = loop.create_connection(Server, host=target, port=port)
task = loop.create_task(coro)
server, serverp = loop.run_until_complete(task)
transports += [server]
user_input = loop.create_task(write())
loop.run_forever()
print('end')
except Exception as e:
if DBG:
full_error()
else:
print(type(e))
print(e)
pause()
sys.exit()
我想使用一行代码来读取我在控制台中输入的内容,以便与 python 中的 asyncio
模块一起使用。我的代码在从服务器接收到数据时打印数据,然后,我希望它读取我输入的内容并将其保存到变量中。我可以使用非标准模块。
节目信息:
- 当前使用 asyncio 非阻塞读取 stdin loop.run_in_executor
- 程序使用 loop.create_connection 运行,然后使用 loop.run_forever 并将标准输入 reader 添加为任务。
代码:
#!python3.5
#chatroom client
import socket, sys, os, traceback, asyncio
from threading import Lock
DBG = False #More Error printing using full_error function
#async stuf
loop = asyncio.get_event_loop()
lock = Lock()
prev_stdin = ""
#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s
#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())
pause = lambda: None #make it global for below
#Pause system compatability
if os.name[0:5]=='posix':
def pause():
os.system('read -n1 -r -p "Press any key to continue . . ." key')
elif os.name[0:2]=='nt':
def pause():
os.system("pause")
else:
def pause():
input("Press enter to continue . . .")
def empty_stdin():
output = ""
while True:
temp = sys.stdin.buffer.read(1)
if temp == "":
break
else:
output += temp
return output
if DBG:
loop.set_debug(DBG)
async def write():
loop = asyncio.get_event_loop()
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
line = prev_stdin + line
prev_stdin = ""
for i in transports:
loop.call_soon(i.write,line.encode('utf-8'))
class Server(asyncio.Protocol):
def connection_made(self,transport):
self.transport = transport
transport.write(b"Connected")
def data_received(self, data):
print("\n%s"%data.decode('utf-8'))
prev_stdin = empty_stdin()
print(prev_stdin,)
try:
coro = loop.create_connection(Server, host=target, port=port)
task = loop.create_task(coro)
print("hi")
print(transports)
server, serverp = loop.run_until_complete(task)
transports += [server]
user_input = loop.create_task(write())
#loop.add_reader(sys.stdin, write)
loop.run_forever()
print('end')
except Exception as e:
if DBG:
full_error()
else:
print(type(e))
print(e)
pause()
sys.exit()
我通过使用 msvcrt
捕捉单个字符和一些代码更改解决了问题(仅 Windows)。
新建Functions/Changes
draw_screen:
- 使用
os.system("cls")
清除屏幕
- 打印我在打印信息时保存的行(可能会滞后)
- 用我当前输入的内容打印一个光标 (">> Hello")
- 使用
写入:
- 用msvcrt.getch 替换了sys.stdin.readline
- 更多变化
无处不在:
- 每当我要打印时,而是将其添加到
screen
变量,该变量是一个包含所有内容的数组。然后它调用 draw_screen() - 删除了 prev_stdin 东西
- 删除了一些其他打印语句
- 每当我要打印时,而是将其添加到
最终代码(目前):
#!python3.5
#chatroom client
import sys, os, traceback, asyncio, msvcrt
from threading import Lock
DBG = False #More Error printing using full_error function
screen = [] #holds lines of text that are printed to the screen
line = ""
#async stuf
loop = asyncio.get_event_loop()
lock = Lock()
#Server Location
target = 'localhost'
port = 17532
buffer_size = 1024
server = None
transports = [] #an array of async.iotransport`s
#Functions for Error handling
full_error = lambda: traceback.print_exception(*sys.exc_info())
#system compatable functions
if os.name[0:5]=='posix':
def pause():
os.system('read -n1 -r -p "Press any key to continue . . ." key')
def cls():
os.system("clear")
elif os.name[0:2]=='nt':
def pause():
os.system("pause")
def cls():
os.system("cls")
else:
def pause():
input("Press enter to continue . . .")
def cls():
os.system("clear")
def draw_screen():
cls()
for i in screen:
print(i)
print("> ", line, end="\r") #Draws current line of text being typed
if DBG:
loop.set_debug(DBG)
async def write():
global line
while True:
temp = await loop.run_in_executor(None, msvcrt.getch)
temp = temp.decode('utf-8')
if temp == "\b":
line = line[0:len(line)-1]
elif not (temp == "\r" or temp == "\n"):
line += temp
else:
for i in transports:
loop.call_soon(i.write,line.encode('utf-8'))
line = ""
draw_screen()
class Server(asyncio.Protocol):
def connection_made(self,transport):
self.transport = transport
transport.write(b"Connected")
def data_received(self, data):
global screen
screen += ["%s"%data.decode('utf-8')]
draw_screen()
try:
coro = loop.create_connection(Server, host=target, port=port)
task = loop.create_task(coro)
server, serverp = loop.run_until_complete(task)
transports += [server]
user_input = loop.create_task(write())
loop.run_forever()
print('end')
except Exception as e:
if DBG:
full_error()
else:
print(type(e))
print(e)
pause()
sys.exit()