在 python asyncio 中取消嵌套协程

Cancel nested coroutines in python asyncio

在我的应用程序中,我有一个协程,它可能等待其他几个协程,如果这个协程中的每个协程都可能等待另一个协程,依此类推。 如果其中一个协程失败,则无需执行所有其他尚未执行的协程。 (在我的例子中,这甚至是有害的,我想改为启动多个回滚协程)。 那么,如何取消所有嵌套协程的执行呢?这是我现在拥有的:

import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    await asyncio.gather(bar1(), bar2())


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

这显然行不通。如您所见,foo 协程已取消,如我所愿,但 bar2 仍然是 运行:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

所以,我肯定做错了什么。这里正确的方法是什么?

据我所知,在取消协程本身的同时自动取消协程的所有子任务是不可能的。所以你必须手动清理子任务。 在等待 asyncio.gather 未来时抛出异常时,您可以通过 Gathering_future 对象的 _children 属性访问剩余任务。 您的示例有效:

import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    gathering = asyncio.gather(bar1(), bar2())
    try:
        await gathering
    except Exception:
        # cancel all subtasks of this coroutine
        [task.cancel() for task in gathering._children]
        raise

async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')

async def bar2():
    for i in range(5):
        print('Bar2', i)
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            # you can cleanup here
            print("Bar2 cancelled")
            break
    else:
        print('Bar2 done')

async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        task = asyncio.gather(task_foo, task_bar)
        await task
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

returns

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Bar2 cancelled
One task failed. Canceling all
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4

当您调用 task_bar.cancel() 时,任务已经完成,因此没有效果。作为 gather docs state:

If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

这正是正在发生的事情,将您的 task_bar 协程稍微修改为:

async def bar():
    try:
        await asyncio.gather(bar1(), bar2())
    except Exception:
        print("Got a generic exception on bar")
        raise

输出:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

我还在 task_bar.cancel() 调用之前打印 task_bar,注意它已经完成,所以调用 cancel 没有效果。

就解决方案而言,我认为生成的协程需要处理它安排的协程的取消,因为协程完成后我找不到检索它们的方法(除了滥用 Task.all_tasks听起来不对)。

说过我不得不在第一个异常中使用 wait 而不是 gather 和 return,这里有一个完整的例子:

import asyncio


async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo done')


async def bar():
    done, pending = await asyncio.wait(
        [bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)

    for task in pending:
        task.cancel()

    for task in done:
        task.result()  # needed to raise the exception if it happened


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)


async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        print(task_bar)
        task_foo.cancel()
        task_bar.cancel()

    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

输出:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4

它不是很好,但是很管用。