异步迭代,如何在等待任务完成的同时进入下一步迭代?

asynchronous iteration, how to move to next step of iteration while waiting for a task to complete?

我正在尝试编写一个迭代器,它在等待 IO 绑定任务时继续进行迭代中的下一步。粗略地演示我想在代码中做什么

for i in iterable:
    await io_bound_task()   # move on to next step in iteration
    # do more stuff when task is complete

我最初尝试运行一个简单的for循环,其中sleep模拟一个IO绑定任务

import asyncio
import random


async def main() -> None:
    for i in range(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

此处代码运行同步输出

starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2

我认为这是因为 for 循环阻塞了。所以我认为异步 for 循环是继续进行的方式吗?所以我尝试使用异步迭代器

from __future__ import annotations

import asyncio
import random


class AsyncIterator:
    def __init__(self, max_value: int) -> None:
        self.max_value = max_value
        self.count = 0

    def __aiter__(self) -> AsyncIterator:
        return self

    async def __anext__(self) -> int:
        if self.count == self.max_value:
            raise StopAsyncIteration
        self.count += 1
        return self.count


async def main() -> None:
    async for i in AsyncIterator(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

但这似乎也 运行 同步并导致输出

starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3

每次。所以我认为异步迭代器没有按照我假设的那样做?在这一点上我被卡住了。我对异步迭代器的理解有问题吗?有人可以给我一些关于如何实现我想要做的事情的指示吗?

我是异步工作的新手,如果我做了一些愚蠢的事情,我深表歉意。任何帮助表示赞赏。谢谢。

如果这是一个相关的细节,我正在 python 3.8.10

您正在寻找的东西称为任务,可以使用 asyncio.create_task 函数创建。您尝试过的所有方法都涉及等待协程 io_bound_task(i),而 await 的意思是“在继续之前等待它完成”。如果您将协程包装在任务中,那么它将 运行 在后台运行,而不是您必须等待它完成才能继续。

这是使用任务的代码版本:

import asyncio
import random


async def main() -> None:
    tasks = []
    for i in range(3):
        print(f"starting task {i}")
        tasks.append(asyncio.create_task(io_bound_task(i)))

    for task in tasks:
        result = await task
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

输出:

starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2

您还可以使用 asyncio.gather(如果您在继续之前需要所有结果)或 asyncio.wait 来等待多个任务,而不是循环。例如,如果任务 2 在任务 0 之前完成并且您不想等待任务 0,您可以这样做:

async def main() -> None:
    pending = []
    for i in range(3):
        print(f"starting task {i}")
        pending.append(asyncio.create_task(io_bound_task(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            result = await task
            print(f"finished task {result}")