使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成
When using asyncio, how do you allow all running tasks to finish before shutting down the event loop
我有以下代码:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
我运行这个函数直到完成。设置关闭时会出现问题 - 函数完成并且任何挂起的任务都不会运行。
这是错误:
task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
如何正确安排关机时间?
为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从 /proc/stat 读取一次,计算该期间的 cpu 使用情况,然后将结果发送到服务器。我想一直调度这些监控作业,直到我收到sigterm,当我停止调度,等待所有当前作业完成,然后优雅地退出。
您可以检索未完成的任务并 运行 再次循环直到它们完成,然后关闭循环或退出您的程序。
pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
pending
是待处理任务的列表。
asyncio.gather()
允许同时等待多个任务。
如果你想确保所有的任务都在一个协程中完成(也许你有一个 "main" 协程),你可以这样做,例如:
async def do_something_periodically():
while True:
asyncio.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*asyncio.all_tasks())
此外,在这种情况下,由于所有任务都是在同一个协程中创建的,因此您已经可以访问这些任务:
async def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.create_task(my_expensive_operation()))
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*tasks)
从 Python 3.7 开始,上述答案使用了多个 已弃用的 API(asyncio.async 和 Task.all_tasks,@asyncio.coroutine, yield from 等),你应该使用这个:
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
await asyncio.sleep(interval)
loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
coro.close()
tasks = asyncio.all_tasks(loop)
expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
loop.run_until_complete(asyncio.gather(*expensive_tasks))
您也可以考虑使用 asyncio.shield,尽管这样做您不会 ALL 运行 任务已完成,但只有 屏蔽。但它在某些情况下仍然有用。
除此之外,从 Python 3.7 开始,我们还可以在这里使用高级 API 方法 asynio.run。作为 Python 核心开发人员,Yury Selivanov 建议:
https://youtu.be/ReXxO_azV-w?t=636
注意: asyncio.run 功能已临时添加到 Python 3.7 中的 asyncio。
希望对您有所帮助!
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
# using asyncio.shield
await asyncio.shield(asyncio.sleep(interval))
coro = do_something_periodically(1, 1)
if __name__ == "__main__":
try:
# using asyncio.run
asyncio.run(coro)
except KeyboardInterrupt:
print('Cancelled!')
使用包装协程,等待待处理任务计数为 1 后再返回。
async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1: # Any task besides loop_job() itself?
await asyncio.sleep(0.2)
asyncio.run(loop_job())
我不确定这是否是您所要求的,但我遇到了类似的问题,这是我想出的最终解决方案。
该代码与 python 3 兼容,仅使用 public asyncio API(意味着没有 hacky _coro
和不推荐使用的 API)。
import asyncio
async def fn():
await asyncio.sleep(1.5)
print('fn')
async def main():
print('main start')
asyncio.create_task(fn()) # run in parallel
await asyncio.sleep(0.2)
print('main end')
def async_run_and_await_all_tasks(main):
def get_pending_tasks():
tasks = asyncio.Task.all_tasks()
pending = [task for task in tasks if task != run_main_task and not task.done()]
return pending
async def run_main():
await main()
while True:
pending_tasks = get_pending_tasks()
if len(pending_tasks) == 0: return
await asyncio.gather(*pending_tasks)
loop = asyncio.new_event_loop()
run_main_coro = run_main()
run_main_task = loop.create_task(run_main_coro)
loop.run_until_complete(run_main_task)
# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)
输出(如预期):
main start
main end
fn
async_run_and_await_all_tasks 函数将使 python 以 nodejs 方式运行:只有在没有未完成的任务时才退出。
如果你想要一种干净的方式来等待在某个本地范围内创建的所有 运行 任务而不泄漏内存(同时防止 garbage collection errors),你可以维护一组 运行 任务并使用 task.add_done_callback(...)
从集合中删除任务。这是为您处理此问题的 class:
class TaskSet:
def __init__(self):
self.tasks = set()
def add(self, coroutine: Coroutine) -> Task:
task = asyncio.create_task(coroutine)
self.tasks.add(task)
task.add_done_callback(lambda _: self.tasks.remove(task))
return task
def __await__(self):
return asyncio.gather(*self.tasks).__await__()
可以这样使用:
async def my_function():
await asyncio.sleep(0.5)
async def go():
tasks = TaskSet()
for i in range(10):
tasks.add(my_function())
await tasks
我注意到一些答案建议使用 asyncio.gather(*asyncio.all_tasks())
,但问题有时可能是一个无限循环,等待 asyncio.current_task()
完成,这本身就是一个问题。一些答案提出了一些复杂的解决方法,包括检查 coro
名称或 len(asyncio.all_tasks())
,但事实证明利用 set
操作非常简单:
async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
我有以下代码:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
我运行这个函数直到完成。设置关闭时会出现问题 - 函数完成并且任何挂起的任务都不会运行。
这是错误:
task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
如何正确安排关机时间?
为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从 /proc/stat 读取一次,计算该期间的 cpu 使用情况,然后将结果发送到服务器。我想一直调度这些监控作业,直到我收到sigterm,当我停止调度,等待所有当前作业完成,然后优雅地退出。
您可以检索未完成的任务并 运行 再次循环直到它们完成,然后关闭循环或退出您的程序。
pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
pending
是待处理任务的列表。asyncio.gather()
允许同时等待多个任务。
如果你想确保所有的任务都在一个协程中完成(也许你有一个 "main" 协程),你可以这样做,例如:
async def do_something_periodically():
while True:
asyncio.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*asyncio.all_tasks())
此外,在这种情况下,由于所有任务都是在同一个协程中创建的,因此您已经可以访问这些任务:
async def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.create_task(my_expensive_operation()))
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*tasks)
从 Python 3.7 开始,上述答案使用了多个 已弃用的 API(asyncio.async 和 Task.all_tasks,@asyncio.coroutine, yield from 等),你应该使用这个:
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
await asyncio.sleep(interval)
loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
coro.close()
tasks = asyncio.all_tasks(loop)
expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
loop.run_until_complete(asyncio.gather(*expensive_tasks))
您也可以考虑使用 asyncio.shield,尽管这样做您不会 ALL 运行 任务已完成,但只有 屏蔽。但它在某些情况下仍然有用。
除此之外,从 Python 3.7 开始,我们还可以在这里使用高级 API 方法 asynio.run。作为 Python 核心开发人员,Yury Selivanov 建议:
https://youtu.be/ReXxO_azV-w?t=636
注意: asyncio.run 功能已临时添加到 Python 3.7 中的 asyncio。
希望对您有所帮助!
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
# using asyncio.shield
await asyncio.shield(asyncio.sleep(interval))
coro = do_something_periodically(1, 1)
if __name__ == "__main__":
try:
# using asyncio.run
asyncio.run(coro)
except KeyboardInterrupt:
print('Cancelled!')
使用包装协程,等待待处理任务计数为 1 后再返回。
async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1: # Any task besides loop_job() itself?
await asyncio.sleep(0.2)
asyncio.run(loop_job())
我不确定这是否是您所要求的,但我遇到了类似的问题,这是我想出的最终解决方案。
该代码与 python 3 兼容,仅使用 public asyncio API(意味着没有 hacky _coro
和不推荐使用的 API)。
import asyncio
async def fn():
await asyncio.sleep(1.5)
print('fn')
async def main():
print('main start')
asyncio.create_task(fn()) # run in parallel
await asyncio.sleep(0.2)
print('main end')
def async_run_and_await_all_tasks(main):
def get_pending_tasks():
tasks = asyncio.Task.all_tasks()
pending = [task for task in tasks if task != run_main_task and not task.done()]
return pending
async def run_main():
await main()
while True:
pending_tasks = get_pending_tasks()
if len(pending_tasks) == 0: return
await asyncio.gather(*pending_tasks)
loop = asyncio.new_event_loop()
run_main_coro = run_main()
run_main_task = loop.create_task(run_main_coro)
loop.run_until_complete(run_main_task)
# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)
输出(如预期):
main start
main end
fn
async_run_and_await_all_tasks 函数将使 python 以 nodejs 方式运行:只有在没有未完成的任务时才退出。
如果你想要一种干净的方式来等待在某个本地范围内创建的所有 运行 任务而不泄漏内存(同时防止 garbage collection errors),你可以维护一组 运行 任务并使用 task.add_done_callback(...)
从集合中删除任务。这是为您处理此问题的 class:
class TaskSet:
def __init__(self):
self.tasks = set()
def add(self, coroutine: Coroutine) -> Task:
task = asyncio.create_task(coroutine)
self.tasks.add(task)
task.add_done_callback(lambda _: self.tasks.remove(task))
return task
def __await__(self):
return asyncio.gather(*self.tasks).__await__()
可以这样使用:
async def my_function():
await asyncio.sleep(0.5)
async def go():
tasks = TaskSet()
for i in range(10):
tasks.add(my_function())
await tasks
我注意到一些答案建议使用 asyncio.gather(*asyncio.all_tasks())
,但问题有时可能是一个无限循环,等待 asyncio.current_task()
完成,这本身就是一个问题。一些答案提出了一些复杂的解决方法,包括检查 coro
名称或 len(asyncio.all_tasks())
,但事实证明利用 set
操作非常简单:
async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})