等待子任务在外部任务完成事件上完成(add_done_callback 的异步版本)

Await sub task finished on outer task finished event (async version of add_done_callback)

假设我们有一些任务(子任务)应该在外部任务完成时完成。我们无法控制外部任务:我们不知道它什么时候完成(它可能发生在子任务完成之前),我们不能等待内部的子任务。

在此代码段中,我们将收到警告,因为外部任务在子任务之前完成:

import asyncio


def create_sub_task():
    sub_task = asyncio.ensure_future(sub())
    # We want this sub_task to be finished when outer task done


async def sub():
    await asyncio.sleep(2)
    print('sub done')


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

add_done_callback 看起来像是一种捕捉外部任务完成时刻的方法,但我们不能在这里等待子任务:这个函数是同步的。

我发现的方法是使用事件循环的私有 _run_once 函数在回调中同步等待任务完成:

import asyncio
from functools import partial


def create_sub_task():
    sub_task = asyncio.ensure_future(sub())

    # Callback to wait for sub_task
    outer_task = asyncio.Task.current_task()
    outer_task.add_done_callback(partial(_stop_task, sub_task))


async def sub():
    await asyncio.sleep(2)
    print('sub done')


def _stop_task(sub_task, task):
    # Ugly way to wait sub_task finished:
    loop = asyncio.get_event_loop()
    while not sub_task.done():
        loop._run_once()


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

它有效,但它是一种丑陋的方式,有很多可能的问题。

关于如何更好地解决任务有什么想法吗?

据我所知,没有内部机制就无法解决这个问题。就我个人而言,将来我会 asyncio.gather 外部任务和子任务,然后重写回调。

不幸的是,Future 的回调列表未通过 public 接口公开(我正在使用 _callbacks):

import asyncio

def create_sub_task():
    sub_task = asyncio.ensure_future(sub())
    outer_task = asyncio.Task.current_task()

    multi_fut = asyncio.gather(sub_task, outer_task)
    for cb in outer_task._callbacks:
        multi_fut.add_done_callback(cb)
        outer_task.remove_done_callback(cb)

async def sub():
    await asyncio.sleep(2)
    print('sub done')


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

我假设您不想或不能更改流程,但我鼓励您重新考虑。也许 post 某些上下文 - 约束起源。