使用 asyncio (Python 3.4+) 异步接收 long 运行 shell 命令的输出?

Asynchronously receive output from long running shell commands with asyncio (Python 3.4+)?

我正在尝试弄清楚如何简单地以非阻塞方式启动一些长 运行 shell 命令,并在它们完成时异步处理它们的输出,按照他们完成的顺序,即使这与他们开始的顺序不同,使用 Python 3.4 及更高版本中可用的 asyncio python 库。

我找不到这样做的简单示例,即使在 asyncio documentation itself 中也是如此,这似乎也很低级。

create_subprocess_shell is what you are looking for. It will return a Process instance, which you can wait() on, or communicate() 与.

使用 get_lines() coroutines, to get shell commands output asynchronously and pass the coroutines to asyncio.as_completed(),按完成顺序获取结果:

#!/usr/bin/env python3.5
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (await p.communicate())[0].splitlines()

async def main():
    # get commands output concurrently
    coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                       .format(i=i, e=sys.executable))
             for i in reversed(range(5))]
    for f in asyncio.as_completed(coros): # print in the order they finish
        print(await f)


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

我的情况和你一模一样。就我而言,我在多个回购目录中使用 运行 多个 git fetch 命令。

在第一次试用中,代码如下所示(cmds['git', 'fetch']):

async def run_async(path: str, cmds: List[str]):
    process = await asyncio.create_subprocess_exec(*cmds, cwd=path)
    await process.wait()

此函数适用于一个存储库,调用者为多个存储库创建任务并运行一个事件loop来完成它们。

虽然程序运行并且磁盘上的结果是正确的,但来自不同存储库的 fetch 输出是交错的。原因是 await process.wait() 可以在任何时间 IO 块(文件、网络等)时将控制权交还给调用者(循环调度程序)。

一个简单的更改即可解决问题:

async def run_async(path: str, cmds: List[str]):
    """
    Run `cmds` asynchronously in `path` directory
    """
    process = await asyncio.create_subprocess_exec(
        *cmds, stdout=asyncio.subprocess.PIPE, cwd=path)
    stdout, _ = await process.communicate()
    stdout and print(stdout.decode())

这里的基本原理是重定向 stdout 以便它在一个地方。就我而言,我只是将其打印出来。如果你需要输出,你可以return最后。

此外,打印顺序可能与开始顺序不同,这对我来说没问题。

源代码为here on github。为了提供一些上下文,该项目是一个命令行工具,用于管理多个 git 存储库,它委托 git 从任何工作目录执行命令。不到200行代码,应该很容易阅读。