如何从使用屏幕重绘的程序获取输出以用于终端屏幕抓取工具?
How do I obtain the output from a program that uses screen redrawing for use in a terminal screen scraper?
我正在尝试获取全屏终端程序的输出,该程序使用重绘转义码来呈现数据,并且需要 tty
(或 pty
)到 运行.
人类遵循的基本程序是:
- 在终端中启动程序。
- 程序使用重绘来显示和更新各个字段的数据。
- 人类等待直到显示一致(可能使用 "it's not flickering" 或 "it's been 0.5s since the last update" 等提示)。
- 人类看着特定位置的字段并记住或记录数据。
- 人退出程序。
- 然后人类根据该数据在程序外执行操作。
我想自动化这个过程。第 4 步和第 5 步可以按任意顺序完成。虽然我这个完美主义者担心屏幕状态的自洽性,但我承认我不太确定如何正确定义它(除了可能使用 "it's been more than a certain timeout period since the last update")。
似乎使用 pty
和 subprocess
后跟某种屏幕抓取工具是一种可能的方法,但我不清楚如何将它们一起使用,并且我正在使用的一些较低级别的对象存在哪些危险。
考虑这个程序:
#!/usr/bin/env python2
import os
import pty
import subprocess
import time
import pexpect.ANSI
# Psuedo-terminal FDs
fd_master, fd_slave = pty.openpty()
# Start 'the_program'
the_proc = subprocess.Popen(['the_program'], stdin=fd_master, stdout=fd_slave, stderr=fd_slave)
# Just kill it after a couple of seconds
time.sleep(2)
the_proc.terminate()
# Read output into a buffer
output_buffer = b''
read_size = None
while (read_size is None) or (read_size > 0):
chunk = os.read(fd_master, 1024)
output_buffer += chunk
read_size = len(chunk)
print("output buffer size: {:d}".format(len(output_buffer)))
# Feed output to screen scraper
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
# Parse presented data...
一个问题是 os.read()
调用总是阻塞。我还想知道是否有更好的方法来获取 pty
输出以供进一步使用。具体来说:
- 有没有办法用更高级别的代码来做到这一点(或其中的一部分)?我不能只将
subprocess.PIPE
用于我的 Popen
调用,因为这样目标程序将无法运行。但是我可以用一些更方便的方法将这些文件描述符包装起来吗I/O?
- 如果不是,如何避免总是阻塞
os.read
调用?我更习惯于 read()
总是 returns 的类似文件的对象,如果到达流的末尾则只是 returns 一个空字符串。在这里,os.read
无论如何最终都会阻塞。
- 我很担心在没有意识到潜在危险(例如千分之一出现一次的竞争条件)的情况下将此脚本添加到 "just work"。我还需要注意什么?
我也同意首先使用 pty
和 subprocess
并不是最好的方法。
您可以使用 pexpect
to do this. Use the run()
function to obtain the data, and see the included VT100 emulator (or pyte
) 进行渲染。
以实用程序top
为例:
import time
import pexpect
import pexpect.ANSI
# Start 'top' and quit after a couple of seconds
output_buffer = pexpect.run('top', timeout=2)
# For continuous reading/interaction, you would need to use the "events"
# arg, threading, or a framework for asynchronous communication.
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
print(str(ansi_term))
(请注意,有时会导致 extra line spacings。)
如果程序没有产生太多输出;最简单的方法是使用 pexpect.run()
通过 pty
:
获取其输出
import pexpect # $ pip install pexpect
output, status = pexpect.run('top', timeout=2, withexitstatus=1)
您可以通过与之前的输出进行比较来检测输出是否为"settled down":
import pexpect # $ pip install pexpect
def every_second(d, last=[None]):
current = d['child'].before
if last[0] == current: # "settled down"
raise pexpect.TIMEOUT(None) # exit run
last[0] = current
output, status = pexpect.run('top', timeout=1, withexitstatus=1,
events={pexpect.TIMEOUT: every_second})
您可以使用与输出中的循环模式相匹配的正则表达式,而不是超时。目的是确定输出何时为 "settled down".
下面是直接使用subprocess
和pty
模块的代码对比:
#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import errno
import os
import pty
import select
from subprocess import Popen, STDOUT
try:
from time import monotonic as timer
except ImportError:
from time import time as timer
output = []
master_fd, slave_fd = pty.openpty() #XXX add cleanup on exception
p = Popen(["top"], stdin=slave_fd, stdout=slave_fd, stderr=STDOUT,
close_fds=True)
os.close(slave_fd)
endtime = timer() + 2 # stop in 2 seconds
while True:
delay = endtime - timer()
if delay <= 0: # timeout
break
if select.select([master_fd], [], [], delay)[0]:
try:
data = os.read(master_fd, 1024)
except OSError as e: #NOTE: no need for IOError here
if e.errno != errno.EIO:
raise
break # EIO means EOF on some systems
else:
if not data: # EOF
break
output.append(data)
os.close(master_fd)
p.terminate()
returncode = p.wait()
print([returncode, b''.join(output)])
注:
- 子进程中的所有三个标准流都使用
slave_fd
,这与您答案中的代码使用 master_fd
for stdin
不同
- 代码读取输出 ,而 过程仍然是 运行。它允许接受大输出(超过内核中单个缓冲区的大小)
- 代码不会因 EIO 错误而丢失数据(此处表示 EOF)
基于Python subprocess readlines() hangs.
我正在尝试获取全屏终端程序的输出,该程序使用重绘转义码来呈现数据,并且需要 tty
(或 pty
)到 运行.
人类遵循的基本程序是:
- 在终端中启动程序。
- 程序使用重绘来显示和更新各个字段的数据。
- 人类等待直到显示一致(可能使用 "it's not flickering" 或 "it's been 0.5s since the last update" 等提示)。
- 人类看着特定位置的字段并记住或记录数据。
- 人退出程序。
- 然后人类根据该数据在程序外执行操作。
我想自动化这个过程。第 4 步和第 5 步可以按任意顺序完成。虽然我这个完美主义者担心屏幕状态的自洽性,但我承认我不太确定如何正确定义它(除了可能使用 "it's been more than a certain timeout period since the last update")。
似乎使用 pty
和 subprocess
后跟某种屏幕抓取工具是一种可能的方法,但我不清楚如何将它们一起使用,并且我正在使用的一些较低级别的对象存在哪些危险。
考虑这个程序:
#!/usr/bin/env python2
import os
import pty
import subprocess
import time
import pexpect.ANSI
# Psuedo-terminal FDs
fd_master, fd_slave = pty.openpty()
# Start 'the_program'
the_proc = subprocess.Popen(['the_program'], stdin=fd_master, stdout=fd_slave, stderr=fd_slave)
# Just kill it after a couple of seconds
time.sleep(2)
the_proc.terminate()
# Read output into a buffer
output_buffer = b''
read_size = None
while (read_size is None) or (read_size > 0):
chunk = os.read(fd_master, 1024)
output_buffer += chunk
read_size = len(chunk)
print("output buffer size: {:d}".format(len(output_buffer)))
# Feed output to screen scraper
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
# Parse presented data...
一个问题是 os.read()
调用总是阻塞。我还想知道是否有更好的方法来获取 pty
输出以供进一步使用。具体来说:
- 有没有办法用更高级别的代码来做到这一点(或其中的一部分)?我不能只将
subprocess.PIPE
用于我的Popen
调用,因为这样目标程序将无法运行。但是我可以用一些更方便的方法将这些文件描述符包装起来吗I/O? - 如果不是,如何避免总是阻塞
os.read
调用?我更习惯于read()
总是 returns 的类似文件的对象,如果到达流的末尾则只是 returns 一个空字符串。在这里,os.read
无论如何最终都会阻塞。 - 我很担心在没有意识到潜在危险(例如千分之一出现一次的竞争条件)的情况下将此脚本添加到 "just work"。我还需要注意什么?
我也同意首先使用 pty
和 subprocess
并不是最好的方法。
您可以使用 pexpect
to do this. Use the run()
function to obtain the data, and see the included VT100 emulator (or pyte
) 进行渲染。
以实用程序top
为例:
import time
import pexpect
import pexpect.ANSI
# Start 'top' and quit after a couple of seconds
output_buffer = pexpect.run('top', timeout=2)
# For continuous reading/interaction, you would need to use the "events"
# arg, threading, or a framework for asynchronous communication.
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
print(str(ansi_term))
(请注意,有时会导致 extra line spacings。)
如果程序没有产生太多输出;最简单的方法是使用 pexpect.run()
通过 pty
:
import pexpect # $ pip install pexpect
output, status = pexpect.run('top', timeout=2, withexitstatus=1)
您可以通过与之前的输出进行比较来检测输出是否为"settled down":
import pexpect # $ pip install pexpect
def every_second(d, last=[None]):
current = d['child'].before
if last[0] == current: # "settled down"
raise pexpect.TIMEOUT(None) # exit run
last[0] = current
output, status = pexpect.run('top', timeout=1, withexitstatus=1,
events={pexpect.TIMEOUT: every_second})
您可以使用与输出中的循环模式相匹配的正则表达式,而不是超时。目的是确定输出何时为 "settled down".
下面是直接使用subprocess
和pty
模块的代码对比:
#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import errno
import os
import pty
import select
from subprocess import Popen, STDOUT
try:
from time import monotonic as timer
except ImportError:
from time import time as timer
output = []
master_fd, slave_fd = pty.openpty() #XXX add cleanup on exception
p = Popen(["top"], stdin=slave_fd, stdout=slave_fd, stderr=STDOUT,
close_fds=True)
os.close(slave_fd)
endtime = timer() + 2 # stop in 2 seconds
while True:
delay = endtime - timer()
if delay <= 0: # timeout
break
if select.select([master_fd], [], [], delay)[0]:
try:
data = os.read(master_fd, 1024)
except OSError as e: #NOTE: no need for IOError here
if e.errno != errno.EIO:
raise
break # EIO means EOF on some systems
else:
if not data: # EOF
break
output.append(data)
os.close(master_fd)
p.terminate()
returncode = p.wait()
print([returncode, b''.join(output)])
注:
- 子进程中的所有三个标准流都使用
slave_fd
,这与您答案中的代码使用master_fd
forstdin
不同
- 代码读取输出 ,而 过程仍然是 运行。它允许接受大输出(超过内核中单个缓冲区的大小)
- 代码不会因 EIO 错误而丢失数据(此处表示 EOF)
基于Python subprocess readlines() hangs.