运行 里面的并发任务 asyncio.as_completed
running concurrent tasks inside asyncio.as_completed
有人可以向我解释为什么下面的代码不起作用,以及我如何重构它,让它起作用。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def io_bound(device):
sleep(5)
return 5
async def sleepy_time(result):
await asyncio.sleep(5)
print(result)
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
for f in asyncio.as_completed(futures):
result = await f
task = asyncio.create_task(sleepy_time(result))
await task
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(
max_workers=3,
)
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
在 asyncio.as_completed 节中,如果我只打印结果,它会同时打印 3 个结果 - 这符合我的预期。但是,当我等待另一项任务时——我希望 asyncio 会启动该任务并进入下一个未来,但事实并非如此;它阻塞直到 asyncio.sleep(5) 有 运行 然后在下一个未来移动。
我已经测试过将两个任务放在 as_completed 节中并且它们 运行 同时进行。
如何让上面的代码运行第二组任务并发?
将运行第二组任务并发,做成一个任务集合,然后用asyncio.wait()等待。
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
tasks = []
for f in asyncio.as_completed(futures):
result = await f
tasks.append(asyncio.create_task(sleepy_time(result)))
await asyncio.wait(tasks)
当你 await 异步函数中的某些东西时,它会阻止函数体的进一步执行,直到 await 解决。
虽然 await asyncio.sleep() 也允许事件循环中的其他任务 运行.
来自 python docs:
sleep()
always suspends the current task, allowing other tasks to run.
Wrap the coro coroutine into a Task
and schedule its execution. Return the Task object.
这意味着创建的任务将 运行 通过事件循环,无论您是否等待它。
你可以通过切换来测试
await asyncio.wait(tasks)
到
await asyncio.sleep(10)
此外,如果事件循环在任务可能 运行 之前停止,您会得到“任务已被销毁但它正在等待处理!”错误。
如果事件循环没有机会 运行 任务,那可能会发生。例如,如果在您的 async def main() 中您使用 create_task() 创建了任务,但根本没有等待任何东西.
有人可以向我解释为什么下面的代码不起作用,以及我如何重构它,让它起作用。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def io_bound(device):
sleep(5)
return 5
async def sleepy_time(result):
await asyncio.sleep(5)
print(result)
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
for f in asyncio.as_completed(futures):
result = await f
task = asyncio.create_task(sleepy_time(result))
await task
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(
max_workers=3,
)
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
在 asyncio.as_completed 节中,如果我只打印结果,它会同时打印 3 个结果 - 这符合我的预期。但是,当我等待另一项任务时——我希望 asyncio 会启动该任务并进入下一个未来,但事实并非如此;它阻塞直到 asyncio.sleep(5) 有 运行 然后在下一个未来移动。
我已经测试过将两个任务放在 as_completed 节中并且它们 运行 同时进行。
如何让上面的代码运行第二组任务并发?
将运行第二组任务并发,做成一个任务集合,然后用asyncio.wait()等待。
async def main(loop):
futures = [loop.run_in_executor(executor, io_bound, x) for x in range(6)]
tasks = []
for f in asyncio.as_completed(futures):
result = await f
tasks.append(asyncio.create_task(sleepy_time(result)))
await asyncio.wait(tasks)
当你 await 异步函数中的某些东西时,它会阻止函数体的进一步执行,直到 await 解决。
虽然 await asyncio.sleep() 也允许事件循环中的其他任务 运行.
来自 python docs:
sleep()
always suspends the current task, allowing other tasks to run.
Wrap the coro coroutine into a
Task
and schedule its execution. Return the Task object.
这意味着创建的任务将 运行 通过事件循环,无论您是否等待它。
你可以通过切换来测试
await asyncio.wait(tasks)
到
await asyncio.sleep(10)
此外,如果事件循环在任务可能 运行 之前停止,您会得到“任务已被销毁但它正在等待处理!”错误。
如果事件循环没有机会 运行 任务,那可能会发生。例如,如果在您的 async def main() 中您使用 create_task() 创建了任务,但根本没有等待任何东西.