运行 里面的并发任务 asyncio.as_completed

running concurrent tasks inside asyncio.as_completed

有人可以向我解释为什么下面的代码不起作用,以及我如何重构它,让它起作用。

import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import sleep


def io_bound(device):
    sleep(5)
    return 5


async def sleepy_time(result):
    await asyncio.sleep(5)
    print(result)


async def main(loop):
    futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]

    for f in asyncio.as_completed(futures):
        result = await f
        task = asyncio.create_task(sleepy_time(result))
        await task


loop = asyncio.get_event_loop()

executor = ThreadPoolExecutor(
    max_workers=3,
)

try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()

在 asyncio.as_completed 节中,如果我只打印结果,它会同时打印 3 个结果 - 这符合我的预期。但是,当我等待另一项任务时——我希望 asyncio 会启动该任务并进入下一个未来,但事实并非如此;它阻塞直到 asyncio.sleep(5) 有 运行 然后在下一个未来移动。

我已经测试过将两个任务放在 as_completed 节中并且它们 运行 同时进行。

如何让上面的代码运行第二组任务并发?

将运行第二组任务并发,做成一个任务集合,然后用asyncio.wait()等待。

async def main(loop):
    futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]

    tasks = []
    for f in asyncio.as_completed(futures):
        result = await f
        tasks.append(asyncio.create_task(sleepy_time(result)))

    await asyncio.wait(tasks)

当你 await 异步函数中的某些东西时,它会阻止函数体的进一步执行,直到 await 解决。

虽然 await asyncio.sleep() 也允许事件循环中的其他任务 运行.

来自 python docs:

sleep() always suspends the current task, allowing other tasks to run.

现在asyncio.create_task():

Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.

这意味着创建的任务将 运行 通过事件循环,无论您是否等待它。

你可以通过切换来测试

await asyncio.wait(tasks)

await asyncio.sleep(10)

此外,如果事件循环在任务可能 运行 之前停止,您会得到“任务已被销毁但它正在等待处理!”错误。

如果事件循环没有机会 运行 任务,那可能会发生。例如,如果在您的 async def main() 中您使用 create_task() 创建了任务,但根本没有等待任何东西.