如何正确使用 concurrent.futures 和 asyncio
How to properly use concurrent.futures with asyncio
我正在制作一个 FastAPI 应用程序的原型,其端点将使用子进程模块启动长 运行 进程。显而易见的解决方案是使用 concurrent.futures 和 ProcessPoolExecutor,但是我无法获得我想要的行为。代码示例:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random
pool = ProcessPoolExecutor(5)
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
sb.check_output(["touch", str(s)])
print("done")
async def async_task():
loop = asyncio.get_event_loop()
print("started")
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
while True:
print("in async task")
done, _ = await asyncio.wait(tasks, timeout=1)
for task in done:
await task
await asyncio.sleep(1)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(async_task())
if __name__ == "__main__":
main()
从表面上看,此示例工作正常,但生成的进程在执行完成后不会停止 - 我在 ps aux | grep python
中看到了所有 python 个进程。等待完成的任务不应该停止吗?最后我不太关心执行的结果,它应该在后台发生并干净地退出——没有任何挂起的进程。
您必须在使用完 ProcessPool 后关闭它,方法是显式调用其 shutdown()
方法,或在 ContextManager 中使用它。我使用了 ContextManager 方法。
我不知道subprocess.check_output是做什么的,所以我把它注释掉了。
我还用对 asyncio.gather 的一次调用替换了你的无限循环,这将在执行器完成之前停止。
我在 Windows,所以为了观察 creation/deletion 进程,我查看了 Windows 任务管理器。该程序创建 5 个子进程,并在 ProcessPool 上下文管理器退出时再次关闭它们。
import asyncio
from concurrent.futures import ProcessPoolExecutor
# import subprocess as sb
import time
import random
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
# sb.check_output(["touch", str(s)])
print("done", s)
async def async_task():
loop = asyncio.get_event_loop()
print("started")
with ProcessPoolExecutor(5) as pool:
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
await asyncio.gather(*tasks)
print("Completely done")
def main():
asyncio.run(async_task())
if __name__ == "__main__":
main()
我正在制作一个 FastAPI 应用程序的原型,其端点将使用子进程模块启动长 运行 进程。显而易见的解决方案是使用 concurrent.futures 和 ProcessPoolExecutor,但是我无法获得我想要的行为。代码示例:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random
pool = ProcessPoolExecutor(5)
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
sb.check_output(["touch", str(s)])
print("done")
async def async_task():
loop = asyncio.get_event_loop()
print("started")
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
while True:
print("in async task")
done, _ = await asyncio.wait(tasks, timeout=1)
for task in done:
await task
await asyncio.sleep(1)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(async_task())
if __name__ == "__main__":
main()
从表面上看,此示例工作正常,但生成的进程在执行完成后不会停止 - 我在 ps aux | grep python
中看到了所有 python 个进程。等待完成的任务不应该停止吗?最后我不太关心执行的结果,它应该在后台发生并干净地退出——没有任何挂起的进程。
您必须在使用完 ProcessPool 后关闭它,方法是显式调用其 shutdown()
方法,或在 ContextManager 中使用它。我使用了 ContextManager 方法。
我不知道subprocess.check_output是做什么的,所以我把它注释掉了。
我还用对 asyncio.gather 的一次调用替换了你的无限循环,这将在执行器完成之前停止。
我在 Windows,所以为了观察 creation/deletion 进程,我查看了 Windows 任务管理器。该程序创建 5 个子进程,并在 ProcessPool 上下文管理器退出时再次关闭它们。
import asyncio
from concurrent.futures import ProcessPoolExecutor
# import subprocess as sb
import time
import random
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
# sb.check_output(["touch", str(s)])
print("done", s)
async def async_task():
loop = asyncio.get_event_loop()
print("started")
with ProcessPoolExecutor(5) as pool:
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
await asyncio.gather(*tasks)
print("Completely done")
def main():
asyncio.run(async_task())
if __name__ == "__main__":
main()