asyncio 子进程可以与 contextmanager 一起使用吗?

Can asyncio subprocess be used with contextmanager?

在 python (3.7+) 中,我正在尝试 运行 一个子进程作为上下文管理器,同时异步流式传输可能大量的标准输出。问题是我似乎无法通过 stdout 回调将 contextmanager 的主体异步获取到 运行。我曾尝试使用线程,运行在那里设置异步函数,但后来我不知道如何将 Process 对象返回到上下文管理器中。

所以问题是:当 运行ning 时,如何从主线程中的上下文管理器生成异步 Process 对象?也就是说,我想在 运行ning 在以下代码中完成 运行ning 之前从 open_subprocess() 产生已经和当前的 运行ning Process。

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd,
        *args,
        stdout=asyncio.subprocess.PIPE)
    read = read_stream(proc, proc.stdout, stdout_callback)
    await asyncio.wait([read])
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    proc_coroutine = stream_subprocess(
        cmd,
        *args,
        stdout_callback=stdout_callback)
    # The following blocks until proc has finished
    # I would like to yield proc while it is running
    proc = asyncio.run(proc_coroutine)
    yield proc
    proc.terminate()

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc:
        # The following code only runs after proc completes
        # but I would expect these print statements to
        # be interleaved with the output from the subprocess
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

使用 @contextlib.asynccontextmanagerProcess.wait() 例程的方法(等待子进程终止,设置和 returns returncode 属性):

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if not data:
            break
        callback(data.decode().rstrip())


async def stream_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(cmd, *args,
                                                stdout=asyncio.subprocess.PIPE)
    await read_stream(proc, proc.stdout, stdout_callback)
    return proc


@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
    try:
        proc = await stream_subprocess(cmd, *args, stdout_callback=stdout_callback)
        yield proc
    finally:
        await proc.wait()

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)


    async def main():
        async with open_subprocess('ping', '-c', '4', 'localhost',
                                   stdout_callback=stdout_callback) as proc:
            # The following code only runs after proc completes
            for i in range(2):
                print(f'RUNNING SUBPROCESS {proc.pid}...')
                time.sleep(1)

        print(f'RETURN CODE: {proc.returncode}')

    asyncio.run(main())

示例 运行 输出:

STDOUT: PING localhost (127.0.0.1): 56 data bytes
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.048 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.074 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=2 ttl=64 time=0.061 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=3 ttl=64 time=0.067 ms
STDOUT: 
STDOUT: --- localhost ping statistics ---
STDOUT: 4 packets transmitted, 4 packets received, 0.0% packet loss
STDOUT: round-trip min/avg/max/stddev = 0.048/0.062/0.074/0.010 ms
RUNNING SUBPROCESS 35439...
RUNNING SUBPROCESS 35439...
RETURN CODE: 0

Process finished with exit code 0

Asyncio 通过暂停任何看起来可能会阻塞的东西来提供并行执行。为此,所有代码都必须在回调或 coroutines 内,并避免调用 time.sleep() 等阻塞函数。除此之外,您的代码还有一些其他问题,例如 await asyncio.wait([x]) 等同于 await x,这意味着 open_subprocess 在所有流读取完成之前不会产生。

构建代码的正确方法是将顶层代码移至 async def 并使用异步上下文管理器。例如:

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd, *args, stdout=asyncio.subprocess.PIPE)
    asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
    yield proc
    if proc.returncode is None:
        proc.terminate()
        await proc.wait()

async def main():
    def stdout_callback(data):
        print('STDOUT:', data)

    async with open_subprocess('ping', '-c', '4', 'localhost',
                               stdout_callback=stdout_callback) as proc:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            await asyncio.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

asyncio.run(main())

如果您坚持混合同步和异步代码,则需要通过 运行 单独线程中的异步事件循环将它们完全分开。那么你的主线程将无法直接访问像 proc 这样的 asyncio 对象,因为它们不是线程安全的。您需要始终使用 call_soon_threadsaferun_coroutine_threadsafe 与事件循环通信。

这种方法很复杂,需要线程间通信和摆弄事件循环,所以除了作为学习练习外,我不推荐这种方法。更不用说,如果您正在使用另一个线程,则根本不需要使用 asyncio——您可以直接在另一个线程中发出同步调用。但话虽如此,这里有一个可能的实现:

import asyncio
import contextlib
import concurrent.futures
import threading

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
    try:
        proc = await asyncio.create_subprocess_exec(
            cmd, *args, stdout=asyncio.subprocess.PIPE)
    except Exception as e:
        proc_data_future.set_exception(e)
        raise
    proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
    await read_stream(proc, proc.stdout, stdout_callback)
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    loop = asyncio.new_event_loop()
    # needed to use asyncio.subprocess outside the main thread
    asyncio.get_child_watcher().attach_loop(loop)
    threading.Thread(target=loop.run_forever).start()
    proc_data_future = concurrent.futures.Future()
    loop.call_soon_threadsafe(
        loop.create_task,
        stream_subprocess(cmd, *args,
                          proc_data_future=proc_data_future,
                          stdout_callback=stdout_callback))
    proc_data = proc_data_future.result()
    yield proc_data
    async def terminate(proc):
        if proc.returncode is None:
            proc.terminate()
            await proc.wait()
    asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
    proc_data['returncode'] = proc_data['proc'].returncode
    loop.call_soon_threadsafe(loop.stop)

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc_data:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc_data["returncode"]}')