如何 运行 Python 子处理和流式处理以及过滤 stdout 和 stderr?

How to run Python subprocess and stream but also filter stdout and stderr?

我有一个类似服务器的应用程序,我想从 Python 运行。它永远不会停止,直到用户中断它。我想在应用程序 运行s 时不断将 stdout 和 stderr 重定向到父级。幸运的是,这正是 subprocess.run 所做的。

Shell:

$ my-app
1
2
3
...

wrapper.py:

import subprocess
subprocess.run(['my-app'])

正在执行 wrapper.py:

$ python wrapper.py
1
2
3
...

我相信这要归功于 subprocess.run 从父进程继承了 stdout 和 stderr 文件描述符。好。

但现在我需要在应用程序输出特定行时做一些事情。想象一下,当输出行将包含 4:

时,我想 运行 任意 Python 代码
$ python wrapper.py
1
2
3
4   <-- here I want to do something
...

或者我想从输出中删除一些行:

$ python wrapper.py   <-- allowed only odd numbers
1
3
...

我想我可以有一个过滤功能,我会以某种方式将其挂接到 subprocess.run 并且它会在输出的每一行中被调用,无论它是 stdout 还是 stderr:

def filter_fn(line):
    if line ...:
        return line.replace(...
    ...

但是如何实现呢?如何将此类或类似函数挂接到 subprocess.run 调用中?


注意: 我不能使用 sh 库,因为它对 Windows.

的支持为零

如果您希望能够处理子进程的 stdout 或 stderr,只需为参数 stdout(resp. stderr)传递 subprocess.PIPE。然后您可以从子进程访问输出流作为 proc.stdout,默认情况下作为字节流,但您可以使用 universal_newlines = True 将其作为字符串获取。示例:

import subprocess
app = subprocess.Popen(['my-app'], stdout = subprocess.PIPE, universal_newlines = True)
for line in app.stdout:
    if line.strip() == '4':
        # special processing
    else:
        sys.stdout.write(line)

你必须注意的是,为了能够在子进程写入后立即处理输出,子进程必须在每一行之后刷新输出。默认情况下,stdout 在定向到终端时是行缓冲的 - 每行都打印在换行符上 - 但在定向到文件或管道时是 size buffered,这意味着它仅在每个8k 或 16k 个字符。

在这种情况下,无论您对调用者大小做什么,您都只会在程序完成时获得标准输出。

我相信这段代码可以做到。上一个答案没有解决同时从两个流读取的问题,这需要异步。否则,另一个答案可能适用于过滤 stdout,然后在 stdout 之后执行 stderr。

这是 python 3.8,它对 asyncio 有更多的描述性方法名称。

2021 年 8 月 25 日更新:使用 asyncio.run 和 asyncio.gather 作为更高级别,更容易理解函数而不是直接操作异步循环。

import sys
import asyncio


async def output_filter(input_stream, output_stream):
    while not input_stream.at_eof():
        output = await input_stream.readline()
        if not output.startswith(b"filtered"):
            output_stream.buffer.write(output)
            output_stream.flush()


async def run_command(command):
    process = await asyncio.create_subprocess_exec(
        *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    await asyncio.gather(
        output_filter(process.stderr, sys.stderr),
        output_filter(process.stdout, sys.stdout),
    )
    # process.communicate() will have no data to read but will close the
    # pipes that are implemented in C, whereas process.wait() will not
    await process.communicate()


def main():
    asyncio.run(run_command(["python", "sample_process.py"]))


if __name__ == "__main__":
    main()