为任何可迭代对象创建一个异步迭代器

Creating an async iterator for any iterable

我正在创建一个对外部 API 进行多次调用的项目。这些 API 调用是在 class 个实例的方法中进行的。我正在尝试制作一个通用函数,该函数接受这些对象的可迭代对象并为它们生成一个异步迭代器。这个异步迭代器将用于异步地进行所有这些外部 API 调用 运行。

然而,根据我在下面的尝试,执行时间仍然随着列表的长度线性增加。

async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
    for i in iterable:
        yield i

async for object in async_generator(iterable=list_of_objects):
    await object.make_time_consuming_api_call()
    # do other work on the object
    await update_in_database(object=object)

如何异步迭代任何对象列表?

由于您正在等待 object.make_time_consuming_api_call(),它会等待每次调用完成,然后才能进行下一次迭代 运行。您可以在提交所有调用后等待它,例如 asyncio.create_task:

async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
    for i in iterable:
        yield i

async def main():
    tasks = list()
    async for object in async_generator(iterable=list_of_objects):
        tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))

    # do other work on the object
    for task, object in zip(tasks, list_of_objects):
        await task
        await update_in_database(object=object)

在这种情况下,您甚至不需要创建 async_iterator:

async def main():
    tasks = list()
    for object in list_of_objects:
        tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))

或者更简洁:

async def main():
    results = await asyncio.gather(*(object.make_time_consuming_api_call() for object in list_of_objects))

    # Added this to store the result as an attribute (see comments)
    for result, object in zip(results, list_of_objects):
        object.attribute = result