使用 Popen 减少控制

controlling less with Popen

我正在尝试通过 Mac OSX 上的 Python 脚本控制 less。基本上我想要的是能够转发控制字符 (up/down/left/right) 但在 Python 程序中处理其他输入。我正在使用 Popen to start less, but less reads user input from a source other than stdin。因此,我不确定如何将任何字符发送到 less。

程序打开较少,等待一秒钟,然后尝试使用两个单独的通道发送 q 退出:stdin/dev/tty(因为它在 SO我在上面链接的问题)。都不行。

from subprocess import Popen, PIPE
import time
p1 = Popen("echo hello | less -K -R", stdin=PIPE, shell=True)
time.sleep(1)
p1.stdin.write(bytes('q', 'utf-8'))
with open("/dev/tty", 'w') as tty:
    tty.write('q')
p1.wait()

如何从 Python 脚本控制 less

有点复杂,但是可以用forkpty(3)创建一个新的TTY,你可以完全控制less,将输入和输出转发到原来的TTY,这样感觉无缝。

下面的代码使用了 Python 3 及其标准库。 pexpect 可以做很多繁重的工作,但 Python 没有。而且这样更有教育意义。

import contextlib
import fcntl
import io
import os
import pty
import select
import signal
import struct
import termios
import time
import tty

假设代码的其余部分在此上下文管理器中缩进到 运行。

with contextlib.ExitStack() as stack:

我们需要获取真正的 TTY 并将其设置为原始模式。这可能会使 TTY 的其他用户感到困惑(例如,此程序退出后的 shell),因此请务必在之后将其恢复到相同状态。

tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
stack.callback(os.close, tty_fd)
tc = termios.tcgetattr(tty_fd)
stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
tty.setraw(tty_fd, when=termios.TCSANOW)

然后我们可以调用forkpty,它在Python中被命名为pty.fork()。这有几件事:

  • 创建 pseudoterminal.
  • 分叉一个新的 child。
  • 将 child 附加到 PTY 的从端。
  • Return child的PID和PTY的master端到原进程。

child应该运行less。请注意 _exit(2) 的使用,因为在 fork.

之后继续执行其他代码可能是不安全的
child_pid, master_fd = pty.fork()
if child_pid == 0:
    os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
    os._exit(0)
stack.callback(os.close, master_fd)

然后需要做一些工作来设置一些异步信号处理程序。

  • SIGCHLD 在 child 进程改变状态(例如退出)时接收。我们可以用它来跟踪 child 是否仍然是 运行ning.
  • SIGWINCH 当控制终端改变大小时收到。我们将此大小转发给 PTY(它会自动向附加到它的进程发送另一个 window 更改信号)。我们也应该设置 PTY 的 window 大小以匹配开始。

转发SIGINTSIGTERM等信号也可能有意义

child_is_running = True
def handle_chld(signum, frame):
    while True:
        pid, status = os.waitpid(-1, os.P_NOWAIT)
        if not pid:
            break
        if pid == child_pid:
            child_is_running = False
def handle_winch(signum, frame):
    tc = struct.pack('HHHH', 0, 0, 0, 0)
    tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
    fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
handler = signal.signal(signal.SIGCHLD, handle_chld)
stack.callback(signal.signal, signal.SIGCHLD, handler)
handler = signal.signal(signal.SIGWINCH, handle_winch)
stack.callback(signal.signal, signal.SIGWINCH, handler)
handle_winch(0, None)

现在是真正的肉:在真实和虚假的 TTY 之间复制数据。

target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
has_sent_q = False
with contextlib.suppress(OSError):
    while child_is_running:
        now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
        if now < target_time:
            timeout = target_time - now
        else:
            timeout = None
            if not has_sent_q:
                os.write(master_fd, b'q')
                has_sent_q = True
        rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
        if tty_fd in rfds:
            data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
            os.write(master_fd, data)
        if master_fd in rfds:
            data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
            os.write(tty_fd, data)

它看起来很简单,尽管我掩盖了一些事情,例如适当的短写和 SIGTTIN/SIGTTOU 处理(通过抑制 OSError 部分隐藏)。