为 Python3 中的 运行 个二进制程序创建最小沙箱

Creating a minimal sandbox for running binary programs in Python3

我正在尝试在最小且安全的环境中为 运行ning 学生的代码构建一个 Python 沙箱。我打算将它 运行 放入一个容器中,并限制它对该容器资源的访问。因此,我目前正在设计应该 运行 到容器中并处理对资源的访问的沙箱部分。

目前,我的规范是限制进程使用的时间和内存量。我还需要能够通过 stdin 与进程通信,并在执行结束时捕获 retcodestdoutstderr

此外,该程序可能会进入无限循环并通过 stdoutstderr 填满内存(我有一个学生的程序因此导致我的容器崩溃)。所以,我还希望能够限制恢复的 stdoutstderr 的大小(达到一定限制后我可以终止进程并忽略其余输出。我不关心这些额外的数据,因为它很可能是一个错误的程序,应该被丢弃)。

目前,我的沙箱几乎可以捕获所有内容,这意味着我可以:

这是我当前的代码(为了示例,我尽量保持它的小):

MEMORY_LIMIT  = 64 * 1024 * 1024
TIMEOUT_LIMIT = 5 * 60

__NR_FILE_NOT_FOUND = -1
__NR_TIMEOUT        = -2
__NR_MEMORY_OUT     = -3

def limit_memory(memory):
    import resource
    return lambda :resource.setrlimit(resource.RLIMIT_AS, (memory, memory))

def run_program(cmd, sinput='', timeout=TIMEOUT_LIMIT, memory=MEMORY_LIMIT):
    """Run the command line and output (ret, sout, serr)."""
    from subprocess import Popen, PIPE
    try:
        proc =  Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE,
                      preexec_fn=limit_memory(memory))
    except FileNotFoundError:
        return (__NR_FILE_NOT_FOUND, "", "")

    sout, serr = "".encode("utf-8"), "".encode("utf-8")
    try:
        sout, serr = proc.communicate(sinput.encode("utf-8"), timeout=timeout)
        ret = proc.wait()
    except subprocess.TimeoutExpired:
        ret = __NR_TIMEOUT
    except MemoryError:
        ret = __NR_MEMORY_OUT
    return (ret, sout.decode("utf-8"), serr.decode("utf-8"))

if __name__ == "__main__":
    ret, out, err = run_program(['./example.sh'], timeout=8)
    print("return code: %i\n" % ret)
    print("stdout:\n%s" % out)
    print("stderr:\n%s" % err)

缺少的功能是:

  1. stdoutstderr 的大小设置限制。我在网上看了几次尝试,但 none 确实有效。

  2. 将函数附加到 stdin 比仅静态字符串更好。该函数应连接到管道 stdoutstderr 和 return 字节到 stdin.

有人对此有想法吗?

PS:我已经看过了:

正如我所说,您可以创建自己的缓冲区并将 STDOUT/STDERR 写入其中,同时检查大小。为了方便起见,您可以编写一个小的 io.BytesIO 包装器来为您进行检查,例如:

from io import BytesIO

# lets first create a size-controlled BytesIO buffer for convenience
class MeasuredStream(BytesIO):

    def __init__(self, maxsize=1024):  # lets use a 1 KB as a default
        super(MeasuredStream, self).__init__()
        self.maxsize = maxsize
        self.length = 0

    def write(self, b):
        if self.length + len(b) > self.maxsize:  # o-oh, max size exceeded
            # write only up to maxsize, truncate the rest
            super(MeasuredStream, self).write(b[:self.maxsize - self.length])
            raise ValueError("Max size reached, excess data is truncated")
        # plenty of space left, write the bytes and increase the length
        self.length += super(MeasuredStream, self).write(b)
        return len(b)  # convention: return the written number of bytes 

请注意,如果您打算进行截断/查找和替换,则必须考虑 length 中的内容,但这足以满足我们的目的。

无论如何,现在您需要做的就是处理自己的流并考虑来自 MeasuredStream 的可能 ValueError,而不是使用 Popen.communicate()。不幸的是,这也意味着您必须自己处理超时。类似于:

from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
import sys
import time

MEMORY_LIMIT  = 64 * 1024 * 1024
TIMEOUT_LIMIT = 5 * 60
STDOUT_LIMIT  = 1024 * 1024  # let's use 1 MB as a STDOUT limit

__NR_FILE_NOT_FOUND      = -1
__NR_TIMEOUT             = -2
__NR_MEMORY_OUT          = -3
__NR_MAX_STDOUT_EXCEEDED = -4  # let's add a new return code

# a cross-platform precision clock
get_timer = time.clock if sys.platform == "win32" else time.time

def limit_memory(memory):
    import resource
    return lambda :resource.setrlimit(resource.RLIMIT_AS, (memory, memory))

def run_program(cmd, sinput='', timeout=TIMEOUT_LIMIT, memory=MEMORY_LIMIT):
    """Run the command line and output (ret, sout, serr)."""
    from subprocess import Popen, PIPE, STDOUT
    try:
        proc =  Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT,
                      preexec_fn=limit_memory(memory), timeout=timeout)
    except FileNotFoundError:
        return (__NR_FILE_NOT_FOUND, "", "")
    sout = MeasuredStream(STDOUT_LIMIT)  # store STDOUT in a measured stream
    start_time = get_timer()  # store a reference timer for our custom timeout
    try:
        proc.stdin.write(sinput.encode("utf-8"))  # write the input to STDIN
        proc.stdin.flush()  # flush the STDOUT buffer
        while True:  # our main listener loop
            line = proc.stdout.readline()  # read a line from the STDOUT
            # use proc.stdout.read(buf_size) instead to handle your own buffer
            if line != b"":  # content collected...
                sout.write(line)  # write it to our stream
            elif proc.poll() is not None:  # process finished, nothing to do
                break
            # finally, check the current time progress...
            if get_timer() >= start_time + TIMEOUT_LIMIT:
                raise TimeoutExpired(proc.args, TIMEOUT_LIMIT)
        ret = proc.poll()  # get the return code
    except TimeoutExpired:
        proc.kill()  # we're no longer interested in the process, kill it
        ret = __NR_TIMEOUT
    except MemoryError:
        ret = __NR_MEMORY_OUT
    except ValueError:  # max buffer reached
        proc.kill()  # we're no longer interested in the process, kill it
        ret = __NR_MAX_STDOUT_EXCEEDED
    sout.seek(0)  # rewind the buffer
    return ret, sout.read().decode("utf-8")  # send the results back

if __name__ == "__main__":
    ret, out, err = run_program(['./example.sh'], timeout=8)
    print("return code: %i\n" % ret)
    print("stdout:\n%s" % out)
    print("stderr:\n%s" % err)

有两个 'issues' 这个,第一个很明显 - 我正在将子进程 STDERR 管道传输到 STDOUT,所以结果将是一个混合。因为从 STDOUT 和 STDERR 流读取是一个阻塞操作,如果你想分别读取它们,你将不得不产生两个线程(并在超过流大小时单独处理它们的 ValueError 异常)。第二个问题是子进程 STDOUT 可以锁定超时检查,因为它取决于 STDOUT 实际上刷新一些数据。这也可以通过一个单独的计时器线程来解决,如果超过超时,该线程将强制终止进程。事实上,这正是 Popen.communicate() 所做的。

操作原理基本上是相同的,您只需将检查外包给单独的线程,最后将所有内容重新连接起来。这是我留给你的练习 ;)

关于你的第二个缺失的功能,你能详细说明一下你的想法吗?

看来这个问题比想象中的要复杂得多,我好不容易在网上找到了解决方案并全部理解了。

事实上,问题的复杂性来自于有多种解决方法。我探索了三种方式(threadingmultiprocessingasyncio)。

最后我选择了使用一个单独的线程来监听当前子进程并捕获程序的输出。在我看来,这是最简单、最便携和最有效的处理方式。

因此,此解决方案背后的基本思想是创建一个将侦听 stdoutstderr 并收集所有输出的线程。当达到限制时,您只需终止进程并 return.

这是我的代码的简化版本:

from subprocess import Popen, PIPE, TimeoutExpired
from queue import Queue
from time import sleep
from threading import Thread

MAX_BUF = 35

def stream_reader(p, q, n):
    stdout_buf, stderr_buf = b'', b''
    while p.poll() is None:
        sleep(0.1)
        stdout_buf += p.stdout.read(n)
        stderr_buf += p.stderr.read(n)
        if (len(stdout_buf) > n) or (len(stderr_buf) > n):
            stdout_buf, stderr_buf = stdout_buf[:n],  stderr_buf[:n]
            try:
                p.kill()
            except ProcessLookupError:
                pass
            break
    q.put((stdout_buf.decode('utf-8', errors="ignore"),
           stderr_buf.decode('utf-8', errors="ignore")))

# Main function    
cmd = ['./example.sh']

proc = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
q = Queue()

t_io = Thread(target=stream_reader, args=(proc, q, MAX_BUF,), daemon=True)
t_io.start()

# Running the process
try:
    proc.stdin.write(b'AAAAAAA')
    proc.stdin.close()
except IOError:
    pass

try:
    ret = proc.wait(timeout=20)
except TimeoutExpired:
    ret = -1 # Or whatever code you decide to give it.

t_io.join()
sout, serr = q.get()

print(ret, sout, serr)

您可以将任何内容附加到 example.sh 脚本,即 运行。请注意,这里避免了一些陷阱以避免死锁和损坏的代码(我对该脚本进行了一些测试)。然而,我并不完全确定这个脚本,所以请不要犹豫,指出明显的错误或改进。