如何 运行 Python 子处理和流式处理以及过滤 stdout 和 stderr?
How to run Python subprocess and stream but also filter stdout and stderr?
我有一个类似服务器的应用程序,我想从 Python 运行。它永远不会停止,直到用户中断它。我想在应用程序 运行s 时不断将 stdout 和 stderr 重定向到父级。幸运的是,这正是 subprocess.run
所做的。
Shell:
$ my-app
1
2
3
...
wrapper.py
:
import subprocess
subprocess.run(['my-app'])
正在执行 wrapper.py
:
$ python wrapper.py
1
2
3
...
我相信这要归功于 subprocess.run
从父进程继承了 stdout 和 stderr 文件描述符。好。
但现在我需要在应用程序输出特定行时做一些事情。想象一下,当输出行将包含 4
:
时,我想 运行 任意 Python 代码
$ python wrapper.py
1
2
3
4 <-- here I want to do something
...
或者我想从输出中删除一些行:
$ python wrapper.py <-- allowed only odd numbers
1
3
...
我想我可以有一个过滤功能,我会以某种方式将其挂接到 subprocess.run
并且它会在输出的每一行中被调用,无论它是 stdout 还是 stderr:
def filter_fn(line):
if line ...:
return line.replace(...
...
但是如何实现呢?如何将此类或类似函数挂接到 subprocess.run
调用中?
注意: 我不能使用 sh
库,因为它对 Windows.
的支持为零
如果您希望能够处理子进程的 stdout 或 stderr,只需为参数 stdout
(resp. stderr
)传递 subprocess.PIPE
。然后您可以从子进程访问输出流作为 proc.stdout
,默认情况下作为字节流,但您可以使用 universal_newlines = True
将其作为字符串获取。示例:
import subprocess
app = subprocess.Popen(['my-app'], stdout = subprocess.PIPE, universal_newlines = True)
for line in app.stdout:
if line.strip() == '4':
# special processing
else:
sys.stdout.write(line)
你必须注意的是,为了能够在子进程写入后立即处理输出,子进程必须在每一行之后刷新输出。默认情况下,stdout 在定向到终端时是行缓冲的 - 每行都打印在换行符上 - 但在定向到文件或管道时是 size buffered,这意味着它仅在每个8k 或 16k 个字符。
在这种情况下,无论您对调用者大小做什么,您都只会在程序完成时获得标准输出。
我相信这段代码可以做到。上一个答案没有解决同时从两个流读取的问题,这需要异步。否则,另一个答案可能适用于过滤 stdout,然后在 stdout 之后执行 stderr。
这是 python 3.8,它对 asyncio 有更多的描述性方法名称。
2021 年 8 月 25 日更新:使用 asyncio.run 和 asyncio.gather 作为更高级别,更容易理解函数而不是直接操作异步循环。
import sys
import asyncio
async def output_filter(input_stream, output_stream):
while not input_stream.at_eof():
output = await input_stream.readline()
if not output.startswith(b"filtered"):
output_stream.buffer.write(output)
output_stream.flush()
async def run_command(command):
process = await asyncio.create_subprocess_exec(
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.gather(
output_filter(process.stderr, sys.stderr),
output_filter(process.stdout, sys.stdout),
)
# process.communicate() will have no data to read but will close the
# pipes that are implemented in C, whereas process.wait() will not
await process.communicate()
def main():
asyncio.run(run_command(["python", "sample_process.py"]))
if __name__ == "__main__":
main()
我有一个类似服务器的应用程序,我想从 Python 运行。它永远不会停止,直到用户中断它。我想在应用程序 运行s 时不断将 stdout 和 stderr 重定向到父级。幸运的是,这正是 subprocess.run
所做的。
Shell:
$ my-app
1
2
3
...
wrapper.py
:
import subprocess
subprocess.run(['my-app'])
正在执行 wrapper.py
:
$ python wrapper.py
1
2
3
...
我相信这要归功于 subprocess.run
从父进程继承了 stdout 和 stderr 文件描述符。好。
但现在我需要在应用程序输出特定行时做一些事情。想象一下,当输出行将包含 4
:
$ python wrapper.py
1
2
3
4 <-- here I want to do something
...
或者我想从输出中删除一些行:
$ python wrapper.py <-- allowed only odd numbers
1
3
...
我想我可以有一个过滤功能,我会以某种方式将其挂接到 subprocess.run
并且它会在输出的每一行中被调用,无论它是 stdout 还是 stderr:
def filter_fn(line):
if line ...:
return line.replace(...
...
但是如何实现呢?如何将此类或类似函数挂接到 subprocess.run
调用中?
注意: 我不能使用 sh
库,因为它对 Windows.
如果您希望能够处理子进程的 stdout 或 stderr,只需为参数 stdout
(resp. stderr
)传递 subprocess.PIPE
。然后您可以从子进程访问输出流作为 proc.stdout
,默认情况下作为字节流,但您可以使用 universal_newlines = True
将其作为字符串获取。示例:
import subprocess
app = subprocess.Popen(['my-app'], stdout = subprocess.PIPE, universal_newlines = True)
for line in app.stdout:
if line.strip() == '4':
# special processing
else:
sys.stdout.write(line)
你必须注意的是,为了能够在子进程写入后立即处理输出,子进程必须在每一行之后刷新输出。默认情况下,stdout 在定向到终端时是行缓冲的 - 每行都打印在换行符上 - 但在定向到文件或管道时是 size buffered,这意味着它仅在每个8k 或 16k 个字符。
在这种情况下,无论您对调用者大小做什么,您都只会在程序完成时获得标准输出。
我相信这段代码可以做到。上一个答案没有解决同时从两个流读取的问题,这需要异步。否则,另一个答案可能适用于过滤 stdout,然后在 stdout 之后执行 stderr。
这是 python 3.8,它对 asyncio 有更多的描述性方法名称。
2021 年 8 月 25 日更新:使用 asyncio.run 和 asyncio.gather 作为更高级别,更容易理解函数而不是直接操作异步循环。
import sys
import asyncio
async def output_filter(input_stream, output_stream):
while not input_stream.at_eof():
output = await input_stream.readline()
if not output.startswith(b"filtered"):
output_stream.buffer.write(output)
output_stream.flush()
async def run_command(command):
process = await asyncio.create_subprocess_exec(
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.gather(
output_filter(process.stderr, sys.stderr),
output_filter(process.stdout, sys.stdout),
)
# process.communicate() will have no data to read but will close the
# pipes that are implemented in C, whereas process.wait() will not
await process.communicate()
def main():
asyncio.run(run_command(["python", "sample_process.py"]))
if __name__ == "__main__":
main()