如何使用 asyncio 复杂管理 shell 进程?

how to complex manage shell processes with asyncio?

我想用 python 的 asyncio 模块跟踪守护进程的重启过程。所以我需要 运行 shell 命令 tail -f -n 0 /var/log/daemon.log 并分析它的输出,比方说,service daemon restart 在后台执行。服务重启命令完成执行后,守护进程继续写入日志并报告其内部检查。跟踪进程读取检查信息并根据其内部逻辑报告重启是否成功。

import asyncio
from asyncio.subprocess import PIPE, STDOUT

async def track():
    output = []
    process = await asyncio.create_subprocess_shell(
        'tail -f -n0 ~/daemon.log',
        stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    while True:
        line = await process.stdout.readline()
        if line.decode() == 'reboot starts\n':
            output.append(line)
            break
    while True:
        line = await process.stdout.readline()
        if line.decode() == '1st check completed\n':
            output.append(line)
            break
    return output

async def reboot():
    lines = [
        '...',
        '...',
        'reboot starts',
        '...',
        '1st check completed',
        '...',
    ]
    p = await asyncio.create_subprocess_shell(
        (
            'echo "rebooting"; '
            'for line in {}; '
                'do echo $line >> ~/daemon.log; sleep 1; '
            'done; '
            'echo "rebooted";'
        ).format(' '.join('"{}"'.format(l) for l in lines)),
        stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    return (await p.communicate())[0].splitlines()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(
        asyncio.ensure_future(track()),
        asyncio.ensure_future(reboot())
    ))
    loop.close()

这段代码是我发现 运行 两个并行的唯一方法。但是如何 运行 track() 严格地在 reboot 之前才不会错过日志中任何可能的输出?以及如何检索两个协程的 return 值?

But how to run track() strictly before reboot to not miss any possible output in log?

您可以 await 在 运行 第二个之前创建第一个子流程。

And how to retrieve return values of both coroutines?

asyncio.gather returns 聚合结果。

示例:

async def main():
    process_a = await asyncio.create_subprocess_shell([...])
    process_b = await asyncio.create_subprocess_shell([...])
    return await asyncio.gather(monitor_a(process_a), monitor_b(process_b))

loop = asyncio.get_event_loop()
result_a, result_b = loop.run_until_complete(main())