如何在终止事件循环之前等待所有任务完成?

How to wait for all tasks to finish before terminating the event loop?

Python 中确保所有并发任务在事件循环结束前完成的标准方法是什么?这是一个简化的例子:

import asyncio

async def foo(delay):
    print("Start foo.") # Eg: Send message
    asyncio.create_task(bar(delay))
    print("End foo.")

async def bar(delay):
    print("Start bar.")
    await asyncio.sleep(delay)
    print("End bar.") # Eg: Delete message after delay

def main():
    asyncio.run(foo(2))

if __name__ == "__main__":
    main()

当前输出:

Start foo. # Eg: Send message
End foo.
Start bar.

期望的输出:

Start foo. # Eg: Send message
End foo.
Start bar.
End bar. # Eg: Delete message after delay

我已经尝试 运行 loop.run_until_complete() 之后的所有未完成任务,但这不起作用,因为到那时循环将被终止。我还尝试将主要功能修改为以下内容:

async def main():
    await foo(2)

    tasks = asyncio.all_tasks()
    if len(tasks) > 0:
        await asyncio.wait(tasks)

if __name__ == "__main__":
    asyncio.run(main())

输出是正确的,但它永远不会终止,因为协程 main() 是任务之一。上面的设置也是discord.py发送消息,过段时间删除,只是用loop.run_forever()代替,所以没有遇到问题。

asyncio(和类似的框架)中没有等待所有任务的标准方法,事实上不应该尝试。就线程而言,一个Task表示bothregular and daemon activities。不加选择地等待所有任务可能会导致应用程序无限期停止。

创建但从未 awaited 的任务实际上是 background/daemon 任务。相反,如果一个任务不应该被视为 background/daemon 那么调用者有责任确保它是 awaited.


最简单的解决方案是让每个协程 await and/or 取消它产生的所有任务。

async def foo(delay):
    print("Start foo.")
    task = asyncio.create_task(bar(delay))
    print("End foo.")
    await task  # foo is done here, it ensures the other task finishes as well

由于 async/tasks 的全部意义在于进行廉价的任务切换,因此这是一种廉价的操作。它也不应该影响任何设计良好的应用程序:

  • 如果一个函数的目的是产生一个值,那么任何子任务都应该是产生该值的一部分。
  • 如果函数的目的是产生副作用,则任何子任务都应该是该副作用的一部分。

对于更复杂的情况,return 任何未完成的任务都是值得的。

async def foo(delay):
    print("Start foo.")
    task = asyncio.create_task(bar(delay))
    print("End foo.")
    return task  # allow the caller to wait for our child tasks

这要求调用者明确处理未完成的任务,但给予及时回复和最大控制权。然后顶级任务负责处理任何孤立任务。

对于一般的 async 编程,structured programming paradigm encodes the idea of "handling outstanding tasks" in a managing object. In Python, this pattern has been encoded by the trio library 作为所谓的 Nursery 对象。

import trio

async def foo(delay, nursery):
    print("Start foo.")
    # spawning a task via a nursery means *someone* awaits it
    nursery.start_soon(bar, delay)
    print("End foo.")

async def bar(delay):
    print("Start bar.")
    await trio.sleep(delay)
    print("End bar.")

async def main():
    # a task may spawn a nursery and pass it to child tasks
    async with trio.open_nursery() as nursery:
        await foo(2, nursery)

if __name__ == "__main__":
    trio.run(main)

虽然有人建议 asyncio as TaskGroups, so far it has been deferred 使用此模式。 但是,asyncio 模式的各种端口可通过第三方库获得。