为任何可迭代对象创建一个异步迭代器
Creating an async iterator for any iterable
我正在创建一个对外部 API 进行多次调用的项目。这些 API 调用是在 class 个实例的方法中进行的。我正在尝试制作一个通用函数,该函数接受这些对象的可迭代对象并为它们生成一个异步迭代器。这个异步迭代器将用于异步地进行所有这些外部 API 调用 运行。
然而,根据我在下面的尝试,执行时间仍然随着列表的长度线性增加。
async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async for object in async_generator(iterable=list_of_objects):
await object.make_time_consuming_api_call()
# do other work on the object
await update_in_database(object=object)
如何异步迭代任何对象列表?
由于您正在等待 object.make_time_consuming_api_call()
,它会等待每次调用完成,然后才能进行下一次迭代 运行。您可以在提交所有调用后等待它,例如 asyncio.create_task
:
async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async def main():
tasks = list()
async for object in async_generator(iterable=list_of_objects):
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))
# do other work on the object
for task, object in zip(tasks, list_of_objects):
await task
await update_in_database(object=object)
在这种情况下,您甚至不需要创建 async_iterator
:
async def main():
tasks = list()
for object in list_of_objects:
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))
或者更简洁:
async def main():
results = await asyncio.gather(*(object.make_time_consuming_api_call() for object in list_of_objects))
# Added this to store the result as an attribute (see comments)
for result, object in zip(results, list_of_objects):
object.attribute = result
我正在创建一个对外部 API 进行多次调用的项目。这些 API 调用是在 class 个实例的方法中进行的。我正在尝试制作一个通用函数,该函数接受这些对象的可迭代对象并为它们生成一个异步迭代器。这个异步迭代器将用于异步地进行所有这些外部 API 调用 运行。
然而,根据我在下面的尝试,执行时间仍然随着列表的长度线性增加。
async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async for object in async_generator(iterable=list_of_objects):
await object.make_time_consuming_api_call()
# do other work on the object
await update_in_database(object=object)
如何异步迭代任何对象列表?
由于您正在等待 object.make_time_consuming_api_call()
,它会等待每次调用完成,然后才能进行下一次迭代 运行。您可以在提交所有调用后等待它,例如 asyncio.create_task
:
async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async def main():
tasks = list()
async for object in async_generator(iterable=list_of_objects):
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))
# do other work on the object
for task, object in zip(tasks, list_of_objects):
await task
await update_in_database(object=object)
在这种情况下,您甚至不需要创建 async_iterator
:
async def main():
tasks = list()
for object in list_of_objects:
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))
或者更简洁:
async def main():
results = await asyncio.gather(*(object.make_time_consuming_api_call() for object in list_of_objects))
# Added this to store the result as an attribute (see comments)
for result, object in zip(results, list_of_objects):
object.attribute = result