asyncio 子进程可以与 contextmanager 一起使用吗?
Can asyncio subprocess be used with contextmanager?
在 python (3.7+) 中,我正在尝试 运行 一个子进程作为上下文管理器,同时异步流式传输可能大量的标准输出。问题是我似乎无法通过 stdout 回调将 contextmanager 的主体异步获取到 运行。我曾尝试使用线程,运行在那里设置异步函数,但后来我不知道如何将 Process 对象返回到上下文管理器中。
所以问题是:当 运行ning 时,如何从主线程中的上下文管理器生成异步 Process 对象?也就是说,我想在 运行ning 在以下代码中完成 运行ning 之前从 open_subprocess() 产生已经和当前的 运行ning Process。
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd,
*args,
stdout=asyncio.subprocess.PIPE)
read = read_stream(proc, proc.stdout, stdout_callback)
await asyncio.wait([read])
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
proc_coroutine = stream_subprocess(
cmd,
*args,
stdout_callback=stdout_callback)
# The following blocks until proc has finished
# I would like to yield proc while it is running
proc = asyncio.run(proc_coroutine)
yield proc
proc.terminate()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
# but I would expect these print statements to
# be interleaved with the output from the subprocess
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
使用 @contextlib.asynccontextmanager
和 Process.wait()
例程的方法(等待子进程终止,设置和 returns returncode
属性):
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if not data:
break
callback(data.decode().rstrip())
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(cmd, *args,
stdout=asyncio.subprocess.PIPE)
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
try:
proc = await stream_subprocess(cmd, *args, stdout_callback=stdout_callback)
yield proc
finally:
await proc.wait()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
async def main():
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
示例 运行 输出:
STDOUT: PING localhost (127.0.0.1): 56 data bytes
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.048 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.074 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=2 ttl=64 time=0.061 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=3 ttl=64 time=0.067 ms
STDOUT:
STDOUT: --- localhost ping statistics ---
STDOUT: 4 packets transmitted, 4 packets received, 0.0% packet loss
STDOUT: round-trip min/avg/max/stddev = 0.048/0.062/0.074/0.010 ms
RUNNING SUBPROCESS 35439...
RUNNING SUBPROCESS 35439...
RETURN CODE: 0
Process finished with exit code 0
Asyncio 通过暂停任何看起来可能会阻塞的东西来提供并行执行。为此,所有代码都必须在回调或 coroutines 内,并避免调用 time.sleep()
等阻塞函数。除此之外,您的代码还有一些其他问题,例如 await asyncio.wait([x])
等同于 await x
,这意味着 open_subprocess
在所有流读取完成之前不会产生。
构建代码的正确方法是将顶层代码移至 async def
并使用异步上下文管理器。例如:
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
yield proc
if proc.returncode is None:
proc.terminate()
await proc.wait()
async def main():
def stdout_callback(data):
print('STDOUT:', data)
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
await asyncio.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
如果您坚持混合同步和异步代码,则需要通过 运行 单独线程中的异步事件循环将它们完全分开。那么你的主线程将无法直接访问像 proc
这样的 asyncio 对象,因为它们不是线程安全的。您需要始终使用 call_soon_threadsafe
和 run_coroutine_threadsafe
与事件循环通信。
这种方法很复杂,需要线程间通信和摆弄事件循环,所以除了作为学习练习外,我不推荐这种方法。更不用说,如果您正在使用另一个线程,则根本不需要使用 asyncio——您可以直接在另一个线程中发出同步调用。但话虽如此,这里有一个可能的实现:
import asyncio
import contextlib
import concurrent.futures
import threading
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
try:
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
except Exception as e:
proc_data_future.set_exception(e)
raise
proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
loop = asyncio.new_event_loop()
# needed to use asyncio.subprocess outside the main thread
asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=loop.run_forever).start()
proc_data_future = concurrent.futures.Future()
loop.call_soon_threadsafe(
loop.create_task,
stream_subprocess(cmd, *args,
proc_data_future=proc_data_future,
stdout_callback=stdout_callback))
proc_data = proc_data_future.result()
yield proc_data
async def terminate(proc):
if proc.returncode is None:
proc.terminate()
await proc.wait()
asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
proc_data['returncode'] = proc_data['proc'].returncode
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc_data:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
time.sleep(1)
print(f'RETURN CODE: {proc_data["returncode"]}')
在 python (3.7+) 中,我正在尝试 运行 一个子进程作为上下文管理器,同时异步流式传输可能大量的标准输出。问题是我似乎无法通过 stdout 回调将 contextmanager 的主体异步获取到 运行。我曾尝试使用线程,运行在那里设置异步函数,但后来我不知道如何将 Process 对象返回到上下文管理器中。
所以问题是:当 运行ning 时,如何从主线程中的上下文管理器生成异步 Process 对象?也就是说,我想在 运行ning 在以下代码中完成 运行ning 之前从 open_subprocess() 产生已经和当前的 运行ning Process。
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd,
*args,
stdout=asyncio.subprocess.PIPE)
read = read_stream(proc, proc.stdout, stdout_callback)
await asyncio.wait([read])
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
proc_coroutine = stream_subprocess(
cmd,
*args,
stdout_callback=stdout_callback)
# The following blocks until proc has finished
# I would like to yield proc while it is running
proc = asyncio.run(proc_coroutine)
yield proc
proc.terminate()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
# but I would expect these print statements to
# be interleaved with the output from the subprocess
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
使用 @contextlib.asynccontextmanager
和 Process.wait()
例程的方法(等待子进程终止,设置和 returns returncode
属性):
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if not data:
break
callback(data.decode().rstrip())
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(cmd, *args,
stdout=asyncio.subprocess.PIPE)
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
try:
proc = await stream_subprocess(cmd, *args, stdout_callback=stdout_callback)
yield proc
finally:
await proc.wait()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
async def main():
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
示例 运行 输出:
STDOUT: PING localhost (127.0.0.1): 56 data bytes
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.048 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.074 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=2 ttl=64 time=0.061 ms
STDOUT: 64 bytes from 127.0.0.1: icmp_seq=3 ttl=64 time=0.067 ms
STDOUT:
STDOUT: --- localhost ping statistics ---
STDOUT: 4 packets transmitted, 4 packets received, 0.0% packet loss
STDOUT: round-trip min/avg/max/stddev = 0.048/0.062/0.074/0.010 ms
RUNNING SUBPROCESS 35439...
RUNNING SUBPROCESS 35439...
RETURN CODE: 0
Process finished with exit code 0
Asyncio 通过暂停任何看起来可能会阻塞的东西来提供并行执行。为此,所有代码都必须在回调或 coroutines 内,并避免调用 time.sleep()
等阻塞函数。除此之外,您的代码还有一些其他问题,例如 await asyncio.wait([x])
等同于 await x
,这意味着 open_subprocess
在所有流读取完成之前不会产生。
构建代码的正确方法是将顶层代码移至 async def
并使用异步上下文管理器。例如:
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
yield proc
if proc.returncode is None:
proc.terminate()
await proc.wait()
async def main():
def stdout_callback(data):
print('STDOUT:', data)
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
await asyncio.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
如果您坚持混合同步和异步代码,则需要通过 运行 单独线程中的异步事件循环将它们完全分开。那么你的主线程将无法直接访问像 proc
这样的 asyncio 对象,因为它们不是线程安全的。您需要始终使用 call_soon_threadsafe
和 run_coroutine_threadsafe
与事件循环通信。
这种方法很复杂,需要线程间通信和摆弄事件循环,所以除了作为学习练习外,我不推荐这种方法。更不用说,如果您正在使用另一个线程,则根本不需要使用 asyncio——您可以直接在另一个线程中发出同步调用。但话虽如此,这里有一个可能的实现:
import asyncio
import contextlib
import concurrent.futures
import threading
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
try:
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
except Exception as e:
proc_data_future.set_exception(e)
raise
proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
loop = asyncio.new_event_loop()
# needed to use asyncio.subprocess outside the main thread
asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=loop.run_forever).start()
proc_data_future = concurrent.futures.Future()
loop.call_soon_threadsafe(
loop.create_task,
stream_subprocess(cmd, *args,
proc_data_future=proc_data_future,
stdout_callback=stdout_callback))
proc_data = proc_data_future.result()
yield proc_data
async def terminate(proc):
if proc.returncode is None:
proc.terminate()
await proc.wait()
asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
proc_data['returncode'] = proc_data['proc'].returncode
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc_data:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
time.sleep(1)
print(f'RETURN CODE: {proc_data["returncode"]}')