在 Python 上安全地将输入逐行(从生成器)传递到子进程的标准输入

Safely pass input line by line (from generator) to subprocess' stdin on Python

我想用 subprocess 模块管理一个子进程,我需要将(非常)大量的行通过管道传输到子标准输入。我正在使用生成器创建输入,并像这样传递到子进程:

def my_gen (end): # simplified example
  for i in range(0, end):
    yield f"line {i}"

with subprocess.Popen(["command", "-o", "option_value"], # simplified example
  stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr) as process:
  for line in my_gen(1e7):
    process.stdin.write(line.encode()) # This is apparently not safe
  out, err = process.communicate() # out and err will be None, 
  # but this closes the process gracefully, which "with" does too

这会导致管道损坏错误,尽管它在我尝试过的每台机器上都不会一直发生:

Traceback (most recent call last):
  File "my_script", line 170, in <module>
    process.stdin.write(line.encode())
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "path/tolib/python3.8/subprocess.py", line 171, in <module>
  File "path/tolib/python3.8/subprocess.py", line 914, in __exit__
    self.stdin.close()
BrokenPipeError: [Errno 32] Broken pipe

那么,将输入逐行从生成器传递到子进程的安全方法是什么?

编辑:我一直在收到关于使用 communicate 的建议,这当然在文档中。这回答了如何安全地通信,但它不接受生成器作为输入。 Edit2:正如 Booboo 指出的那样,该示例将引发运行时错误(不是我在代码中发现的错误),对 range 的调用应该是 range(0, int(end)) 因此 my_gen 可以接受 [=15 中的数字=] 表示法。

文档说使用 .communicate:

Warning: Use communicate() rather than .stdin.write, .stdout.read or .stderr.read to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.

https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate

首先,如果您希望 stdoutstderr 不被管道传输,那么要么根本不将这些参数指定给 Popen 调用,要么将它们的值指定为None,如果未指定则为默认值(但不要将这些指定为 sys.stdoutsys.stderr)。

为什么不呢?查看 Popen.communicate 方法的源代码,我可以看到有针对只有一个非 None 参数且该参数是 sysin[ 的情况的特殊优化代码=44=] 参数然后 Popen.communicate 通过简单地将过去的输入字符串写入管道 并忽略可能发生的任何 BrokenPipeError 错误 来实现。但是通过按原样传递 stdoutstderr 参数,我怀疑 communicate 很困惑,现在正在启动线程来处理处理,这最终会间歇性地导致您的异常。

现在我相信您可以在不使用 communicate 的情况下执行您的写入 也可以忽略 BrokenPipeError。当我尝试以下代码时(用 Popen 执行我自己的命令,将管道中的内容写入文件并使用文本模式),事实上,我没有遇到任何 BrokenPipeError 异常(我也不希望使用 stdoutstderr 的正确设置)。所以我不能发誓如果发生这样的异常,输出是否仍然正确。

顺便说一句,range 内置函数不接受浮点对象(至少对我而言不是),所以我不知道您如何指定 1e7 .

我还修改了代码以在每行末尾添加终止换行符并以文本模式处理,但您不应该觉得这样做是受限的。

import subprocess
import sys

def my_gen (end): # simplified example
    for i in range(0, end):
        yield f"line {i}\n"

with subprocess.Popen(["command", "-o", "option_value"], stdin=subprocess.PIPE, text=True) as process: # simplified example
    for line in my_gen(10_000_000):
        try:
            process.stdin.write(line)
        except BrokenPipeError as e:
            pass
    out, err = process.communicate()