如何正确使用 concurrent.futures 和 asyncio

How to properly use concurrent.futures with asyncio

我正在制作一个 FastAPI 应用程序的原型,其端点将使用子进程模块启动长 运行 进程。显而易见的解决方案是使用 concurrent.futures 和 ProcessPoolExecutor,但是我无法获得我想要的行为。代码示例:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random

pool = ProcessPoolExecutor(5)

def long_task(s):
    print("started")
    time.sleep(random.randrange(5, 15))
    sb.check_output(["touch", str(s)])
    print("done")

async def async_task():
    loop = asyncio.get_event_loop()
    print("started")
    tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
    while True:
        print("in async task")
        done, _ = await asyncio.wait(tasks, timeout=1)
        for task in done:
            await task
        await asyncio.sleep(1)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_task())


if __name__ == "__main__":
    main()

从表面上看,此示例工作正常,但生成的进程在执行完成后不会停止 - 我在 ps aux | grep python 中看到了所有 python 个进程。等待完成的任务不应该停止吗?最后我不太关心执行的结果,它应该在后台发生并干净地退出——没有任何挂起的进程。

您必须在使用完 ProcessPool 后关闭它,方法是显式调用其 shutdown() 方法,或在 ContextManager 中使用它。我使用了 ContextManager 方法。

我不知道subprocess.check_output是做什么的,所以我把它注释掉了。

我还用对 asyncio.gather 的一次调用替换了你的无限循环,这将在执行器完成之前停止。

我在 Windows,所以为了观察 creation/deletion 进程,我查看了 Windows 任务管理器。该程序创建 5 个子进程,并在 ProcessPool 上下文管理器退出时再次关闭它们。

import asyncio
from concurrent.futures import ProcessPoolExecutor
# import subprocess as sb
import time
import random

def long_task(s):
    print("started")
    time.sleep(random.randrange(5, 15))
    # sb.check_output(["touch", str(s)])
    print("done", s)

async def async_task():
    loop = asyncio.get_event_loop()
    print("started")
    with ProcessPoolExecutor(5) as pool:
        tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
        await asyncio.gather(*tasks)
    print("Completely done")

def main():
    asyncio.run(async_task())

if __name__ == "__main__":
    main()