异步迭代,如何在等待任务完成的同时进入下一步迭代?
asynchronous iteration, how to move to next step of iteration while waiting for a task to complete?
我正在尝试编写一个迭代器,它在等待 IO 绑定任务时继续进行迭代中的下一步。粗略地演示我想在代码中做什么
for i in iterable:
await io_bound_task() # move on to next step in iteration
# do more stuff when task is complete
我最初尝试运行一个简单的for
循环,其中sleep
模拟一个IO绑定任务
import asyncio
import random
async def main() -> None:
for i in range(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
此处代码运行同步输出
starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2
我认为这是因为 for
循环阻塞了。所以我认为异步 for
循环是继续进行的方式吗?所以我尝试使用异步迭代器
from __future__ import annotations
import asyncio
import random
class AsyncIterator:
def __init__(self, max_value: int) -> None:
self.max_value = max_value
self.count = 0
def __aiter__(self) -> AsyncIterator:
return self
async def __anext__(self) -> int:
if self.count == self.max_value:
raise StopAsyncIteration
self.count += 1
return self.count
async def main() -> None:
async for i in AsyncIterator(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
但这似乎也 运行 同步并导致输出
starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3
每次。所以我认为异步迭代器没有按照我假设的那样做?在这一点上我被卡住了。我对异步迭代器的理解有问题吗?有人可以给我一些关于如何实现我想要做的事情的指示吗?
我是异步工作的新手,如果我做了一些愚蠢的事情,我深表歉意。任何帮助表示赞赏。谢谢。
如果这是一个相关的细节,我正在 python 3.8.10
。
您正在寻找的东西称为任务,可以使用 asyncio.create_task
函数创建。您尝试过的所有方法都涉及等待协程 io_bound_task(i)
,而 await 的意思是“在继续之前等待它完成”。如果您将协程包装在任务中,那么它将 运行 在后台运行,而不是您必须等待它完成才能继续。
这是使用任务的代码版本:
import asyncio
import random
async def main() -> None:
tasks = []
for i in range(3):
print(f"starting task {i}")
tasks.append(asyncio.create_task(io_bound_task(i)))
for task in tasks:
result = await task
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
输出:
starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2
您还可以使用 asyncio.gather
(如果您在继续之前需要所有结果)或 asyncio.wait
来等待多个任务,而不是循环。例如,如果任务 2 在任务 0 之前完成并且您不想等待任务 0,您可以这样做:
async def main() -> None:
pending = []
for i in range(3):
print(f"starting task {i}")
pending.append(asyncio.create_task(io_bound_task(i)))
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
print(f"finished task {result}")
我正在尝试编写一个迭代器,它在等待 IO 绑定任务时继续进行迭代中的下一步。粗略地演示我想在代码中做什么
for i in iterable:
await io_bound_task() # move on to next step in iteration
# do more stuff when task is complete
我最初尝试运行一个简单的for
循环,其中sleep
模拟一个IO绑定任务
import asyncio
import random
async def main() -> None:
for i in range(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
此处代码运行同步输出
starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2
我认为这是因为 for
循环阻塞了。所以我认为异步 for
循环是继续进行的方式吗?所以我尝试使用异步迭代器
from __future__ import annotations
import asyncio
import random
class AsyncIterator:
def __init__(self, max_value: int) -> None:
self.max_value = max_value
self.count = 0
def __aiter__(self) -> AsyncIterator:
return self
async def __anext__(self) -> int:
if self.count == self.max_value:
raise StopAsyncIteration
self.count += 1
return self.count
async def main() -> None:
async for i in AsyncIterator(3):
print(f"starting task {i}")
result = await io_bound_task(i)
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
但这似乎也 运行 同步并导致输出
starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3
每次。所以我认为异步迭代器没有按照我假设的那样做?在这一点上我被卡住了。我对异步迭代器的理解有问题吗?有人可以给我一些关于如何实现我想要做的事情的指示吗?
我是异步工作的新手,如果我做了一些愚蠢的事情,我深表歉意。任何帮助表示赞赏。谢谢。
如果这是一个相关的细节,我正在 python 3.8.10
。
您正在寻找的东西称为任务,可以使用 asyncio.create_task
函数创建。您尝试过的所有方法都涉及等待协程 io_bound_task(i)
,而 await 的意思是“在继续之前等待它完成”。如果您将协程包装在任务中,那么它将 运行 在后台运行,而不是您必须等待它完成才能继续。
这是使用任务的代码版本:
import asyncio
import random
async def main() -> None:
tasks = []
for i in range(3):
print(f"starting task {i}")
tasks.append(asyncio.create_task(io_bound_task(i)))
for task in tasks:
result = await task
print(f"finished task {result}")
async def io_bound_task(i: int) -> int:
await asyncio.sleep(random.random())
return i
asyncio.run(main())
输出:
starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2
您还可以使用 asyncio.gather
(如果您在继续之前需要所有结果)或 asyncio.wait
来等待多个任务,而不是循环。例如,如果任务 2 在任务 0 之前完成并且您不想等待任务 0,您可以这样做:
async def main() -> None:
pending = []
for i in range(3):
print(f"starting task {i}")
pending.append(asyncio.create_task(io_bound_task(i)))
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
print(f"finished task {result}")