在 Python 子进程模块中过滤掉需要终端的命令

Filter out command that needs a terminal in Python subprocess module

我正在开发一个机器人,它接受来自网络 (XMPP) 的命令并使用 Python 中的子进程模块来执行它们并发回命令的输出。本质上它是一个类似于 SSH 的基于 XMPP 的非交互式 shell.

机器人只执行来自经过身份验证的可信来源的命令,因此允许任意 shell 命令 (shell=True)。

但是,当我不小心发送了一些需要tty的命令时,机器人卡住了。

例如:

subprocess.check_output(['vim'], shell=False)
subprocess.check_output('vim', shell=True)

如果收到上述每个命令,机器人就会卡住,机器人 运行 所在的终端已损坏。

虽然机​​器人只接收来自经过身份验证的可信来源的命令,但人类会犯错。我怎样才能让机器人过滤掉那些会破坏自己的命令?我知道有 os.isatty 但我该如何使用它?有没有办法检测那些 "bad" 命令并拒绝执行它们?

TL;DR:

说,有两种命令:

  • ls 这样的命令:不需要 tty 到 运行。
  • 类似vim的命令:需要tty;如果没有给出 tty,则中断子进程。

我怎么知道一个命令是 ls-like 还是 vim-like 并且拒绝 运行 命令是 vim-like?

好吧,SSH 已经是一个允许用户remotely execute commands and be authenticated at the same time 的工具。身份验证部分非常棘手,请注意,从安全角度来看,构建您描述的软件有点冒险。

无法确定进程是否需要 tty。并且没有 os.isatty 方法,因为如果您 运行 一个需要一个的子流程并不意味着有一个。 :)

一般来说,如果您考虑使用白名单命令,从安全角度来看可能会更安全,并且也是解决此问题的方法。您可以选择该白名单来避免需要 tty 的事情,因为我认为您不会轻易解决这个问题。

非常感谢@J.F。在 Sebastia 的帮助下(请参阅问题下的评论),我已经为我的案例找到了解决方案(解决方法?)。

vim 中断终端而 ls 不中断的原因是 vim 需要一个 tty。正如 Sebastia 所说,我们可以使用 pty.openpty() 为 vim 提供 pty。输入 pty 保证命令不会破坏终端,我们可以添加 timout 来自动终止此类进程。这是(肮脏的)工作示例:

#!/usr/bin/env python3

import pty
from subprocess import STDOUT, check_output, TimeoutExpired

master_fd, slave_fd = pty.openpty()


try:
    output1 = check_output(['ls', '/'], stdin=slave_fd, stderr=STDOUT, universal_newlines=True, timeout=3)
    print(output1)
except TimeoutExpired:
    print('Timed out')

try:
    output2 = check_output(['vim'], stdin=slave_fd, stderr=STDOUT, universal_newlines=True, timeout=3)
    print(output2)
except TimeoutExpired:
    print('Timed out')

请注意,我们需要处理的是标准输入,而不是标准输出或标准错误。

您期望的是一个函数,它接收命令作为输入,并且 returns 有意义的 由 运行 命令输出。

由于命令是任意的,对 tty 的要求只是可能发生的许多不良情况之一(其他包括 运行ning 无限循环),您的函数应该只关心它的 运行ning期间,换句话说,一个命令是否“坏”应该由它是否在有限的时间内结束来确定,并且由于 subprocess 本质上是异步的,你可以只 运行 命令以更高的眼光看待它。

要播放的演示代码,您可以更改 cmd 值以查看其性能有何不同:

#!/usr/bin/env python
# coding: utf-8

import time
import subprocess
from subprocess import PIPE


#cmd = ['ls']
#cmd = ['sleep', '3']
cmd = ['vim', '-u', '/dev/null']

print 'call cmd'
p = subprocess.Popen(cmd, shell=True,
                     stdin=PIPE, stderr=PIPE, stdout=PIPE)
print 'called', p

time_limit = 2
timer = 0
time_gap = 0.2

ended = False
while True:
    time.sleep(time_gap)

    returncode = p.poll()
    print 'process status', returncode

    timer += time_gap
    if timer >= time_limit:
        print 'timeout, kill process'
        p.kill()
        break

    if returncode is not None:
        ended = True
        break

if ended:
    print 'process ended by', returncode

    print 'read'
    out, err = p.communicate()
    print 'out', repr(out)
    print 'error', repr(err)
else:
    print 'process failed'

以上代码有三点值得注意:

  1. 我们使用Popen而不是check_output来运行命令,不像check_output会等待进程结束,Popen returns 立即,因此我们可以做进一步的事情来控制过程。

  2. 我们实现了一个计时器来检查进程的状态,如果它 运行s 太长,我们手动杀死它,因为我们认为如果进程不能结束就没有意义在有限的时间内。这样你原来的问题就解决了,因为vim永远不会结束,它肯定会被当作“无意义”的命令杀死。

  3. 定时器帮我们过滤掉坏命令后,我们可以通过调用Popen对象的communicate方法获取命令的stdout和stderr,之后就是你的了选择决定向用户return发送什么。

结论

不需要tty模拟,我们应该运行异步子进程,然后通过定时器控制它来决定它是否应该被杀死,对于那些正常结束的,它安全且容易获得输出。

可以参考我的回答:,使用伪终端使stdout无阻塞,句柄stdin/stdout.[=14=使用select ]

我可以将 command 变量修改为 'vim'。脚本运行良好。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'vim'

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() process the leader of a new session, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)