在 Python 上安全地将输入逐行(从生成器)传递到子进程的标准输入

Safely pass input line by line (from generator) to subprocess' stdin on Python

我想用 subprocess 模块管理一个子进程,我需要将(非常)大量的行通过管道传输到子标准输入。我正在使用生成器创建输入,并像这样传递到子进程:

def my_gen (end): # simplified example
  for i in range(0, end):
    yield f"line {i}"

with subprocess.Popen(["command", "-o", "option_value"], # simplified example
  stdin = subprocess.PIPE, stdout = sys.stdout, stderr = sys.stderr) as process:
  for line in my_gen(1e7):
    process.stdin.write(line.encode()) # This is apparently not safe
  out, err = process.communicate() # out and err will be None, 
  # but this closes the process gracefully, which "with" does too


Traceback (most recent call last):
  File "my_script", line 170, in <module>
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "path/tolib/python3.8/subprocess.py", line 171, in <module>
  File "path/tolib/python3.8/subprocess.py", line 914, in __exit__
BrokenPipeError: [Errno 32] Broken pipe


编辑:我一直在收到关于使用 communicate 的建议,这当然在文档中。这回答了如何安全地通信,但它不接受生成器作为输入。 Edit2:正如 Booboo 指出的那样,该示例将引发运行时错误(不是我在代码中发现的错误),对 range 的调用应该是 range(0, int(end)) 因此 my_gen 可以接受 [=15 中的数字=] 表示法。

文档说使用 .communicate:

Warning: Use communicate() rather than .stdin.write, .stdout.read or .stderr.read to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.


首先,如果您希望 stdoutstderr 不被管道传输,那么要么根本不将这些参数指定给 Popen 调用,要么将它们的值指定为None,如果未指定则为默认值(但不要将这些指定为 sys.stdoutsys.stderr)。

为什么不呢?查看 Popen.communicate 方法的源代码,我可以看到有针对只有一个非 None 参数且该参数是 sysin[ 的情况的特殊优化代码=44=] 参数然后 Popen.communicate 通过简单地将过去的输入字符串写入管道 并忽略可能发生的任何 BrokenPipeError 错误 来实现。但是通过按原样传递 stdoutstderr 参数,我怀疑 communicate 很困惑,现在正在启动线程来处理处理,这最终会间歇性地导致您的异常。

现在我相信您可以在不使用 communicate 的情况下执行您的写入 也可以忽略 BrokenPipeError。当我尝试以下代码时(用 Popen 执行我自己的命令,将管道中的内容写入文件并使用文本模式),事实上,我没有遇到任何 BrokenPipeError 异常(我也不希望使用 stdoutstderr 的正确设置)。所以我不能发誓如果发生这样的异常,输出是否仍然正确。

顺便说一句,range 内置函数不接受浮点对象(至少对我而言不是),所以我不知道您如何指定 1e7 .


import subprocess
import sys

def my_gen (end): # simplified example
    for i in range(0, end):
        yield f"line {i}\n"

with subprocess.Popen(["command", "-o", "option_value"], stdin=subprocess.PIPE, text=True) as process: # simplified example
    for line in my_gen(10_000_000):
        except BrokenPipeError as e:
    out, err = process.communicate()