如何从使用屏幕重绘的程序获取输出以用于终端屏幕抓取工具?

How do I obtain the output from a program that uses screen redrawing for use in a terminal screen scraper?

我正在尝试获取全屏终端程序的输出,该程序使用重绘转义码来呈现数据,并且需要 tty(或 pty)到 运行.

人类遵循的基本程序是:

  1. 在终端中启动程序。
  2. 程序使用重绘来显示和更新各个字段的数据。
  3. 人类等待直到显示一致(可能使用 "it's not flickering" 或 "it's been 0.5s since the last update" 等提示)。
  4. 人类看着特定位置的字段并记住或记录数据。
  5. 人退出程序。
  6. 然后人类根据该数据在程序外执行操作。

我想自动化这个过程。第 4 步和第 5 步可以按任意顺序完成。虽然我这个完美主义者担心屏幕状态的自洽性,但我承认我不太确定如何正确定义它(除了可能使用 "it's been more than a certain timeout period since the last update")。

似乎使用 ptysubprocess 后跟某种屏幕抓取工具是一种可能的方法,但我不清楚如何将它们一起使用,并且我正在使用的一些较低级别的对象存在哪些危险。

考虑这个程序:

#!/usr/bin/env python2
import os
import pty
import subprocess
import time

import pexpect.ANSI

# Psuedo-terminal FDs
fd_master, fd_slave = pty.openpty()

# Start 'the_program'
the_proc = subprocess.Popen(['the_program'], stdin=fd_master, stdout=fd_slave, stderr=fd_slave)

# Just kill it after a couple of seconds
time.sleep(2)
the_proc.terminate()

# Read output into a buffer
output_buffer = b''
read_size = None

while (read_size is None) or (read_size > 0):
    chunk = os.read(fd_master, 1024)
    output_buffer += chunk
    read_size = len(chunk)

print("output buffer size: {:d}".format(len(output_buffer)))

# Feed output to screen scraper
ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)

# Parse presented data... 

一个问题是 os.read() 调用总是阻塞。我还想知道是否有更好的方法来获取 pty 输出以供进一步使用。具体来说:

  1. 有没有办法用更高级别的代码来做到这一点(或其中的一部分)?我不能只将 subprocess.PIPE 用于我的 Popen 调用,因为这样目标程序将无法运行。但是我可以用一些更方便的方法将这些文件描述符包装起来吗I/O?
  2. 如果不是,如何避免总是阻塞 os.read 调用?我更习惯于 read() 总是 returns 的类似文件的对象,如果到达流的末尾则只是 returns 一个空字符串。在这里,os.read 无论如何最终都会阻塞。
  3. 我很担心在没有意识到潜在危险(例如千分之一出现一次的竞争条件)的情况下将此脚本添加到 "just work"。我还需要注意什么?

我也同意首先使用 ptysubprocess 并不是最好的方法。

您可以使用 pexpect to do this. Use the run() function to obtain the data, and see the included VT100 emulator (or pyte) 进行渲染。

以实用程序top为例:

import time
import pexpect
import pexpect.ANSI

# Start 'top' and quit after a couple of seconds
output_buffer = pexpect.run('top', timeout=2)

# For continuous reading/interaction, you would need to use the "events"
# arg, threading, or a framework for asynchronous communication.

ansi_term = pexpect.ANSI.ANSI(24, 80)
ansi_term.write(output_buffer)
print(str(ansi_term))

(请注意,有时会导致 extra line spacings。)

如果程序没有产生太多输出;最简单的方法是使用 pexpect.run() 通过 pty:

获取其输出
import pexpect # $ pip install pexpect

output, status = pexpect.run('top', timeout=2, withexitstatus=1)

您可以通过与之前的输出进行比较来检测输出是否为"settled down":

import pexpect # $ pip install pexpect

def every_second(d, last=[None]):
    current = d['child'].before
    if last[0] == current: # "settled down"
        raise pexpect.TIMEOUT(None) # exit run
    last[0] = current

output, status =  pexpect.run('top', timeout=1, withexitstatus=1,
                              events={pexpect.TIMEOUT: every_second})

您可以使用与输出中的循环模式相匹配的正则表达式,而不是超时。目的是确定输出何时为 "settled down".

下面是直接使用subprocesspty模块的代码对比:

#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import errno
import os
import pty
import select
from subprocess import Popen, STDOUT
try:
    from time import monotonic as timer
except ImportError:
    from time import time as timer

output = []
master_fd, slave_fd = pty.openpty() #XXX add cleanup on exception
p = Popen(["top"], stdin=slave_fd, stdout=slave_fd, stderr=STDOUT,
          close_fds=True)
os.close(slave_fd)
endtime = timer() + 2 # stop in 2 seconds
while True:
    delay = endtime - timer()
    if delay <= 0: # timeout
        break
    if select.select([master_fd], [], [], delay)[0]:
        try:
            data = os.read(master_fd, 1024)
        except OSError as e: #NOTE: no need for IOError here
            if e.errno != errno.EIO:
                raise
            break # EIO means EOF on some systems
        else:
            if not data: # EOF
                break
            output.append(data)
os.close(master_fd)
p.terminate()
returncode = p.wait()
print([returncode, b''.join(output)])

注:

  • 子进程中的所有三个标准流都使用 slave_fd,这与您答案中的代码使用 master_fd for stdin
  • 不同
  • 代码读取输出 ,而 过程仍然是 运行。它允许接受大输出(超过内核中单个缓冲区的大小)
  • 代码不会因 EIO 错误而丢失数据(此处表示 EOF)

基于Python subprocess readlines() hangs.