为什么父协程没有取消?

Why parent coroutine is not canceled?

我在玩 Python asyncio。我的程序只有三个协程。其中两个是我直接安排的,而第三个是前者之一安排的。我想在用户按下 Ctrl+C:

时正确完成我的程序
import asyncio

async def coro1():
    try:
        print('coro1')
        await asyncio.sleep(1000)
    except Exception as e:
        print('coro1 exc %s' % repr(e))
        raise

async def coro2():
    try:
        print('coro2')
        await asyncio.ensure_future(coro3())
        await asyncio.sleep(1000)
    except Exception as e:
        print('coro2 exc %s' % repr(e))
        raise

async def coro3():
    try:
        print('coro3')
        await asyncio.sleep(1000)
    except Exception as e:
        print('coro3 exc %s' % repr(e))
        raise

loop = asyncio.get_event_loop()    
try:
    f1 = asyncio.ensure_future(coro1())
    f2 = asyncio.ensure_future(coro2())    
    loop.run_forever()
except KeyboardInterrupt:
    print('Exiting... Cancelling all tasks')

    f2.cancel()
    f1.cancel()

    # This code gives the same result:
    # for task in asyncio.tasks.Task.all_tasks(loop):
    #    task.cancel()

    print('Cancellation is done!')

    loop.stop()
    loop.run_forever()
finally:
    loop.close()

此代码产生下一个输出:

coro1
coro2
coro3
^CExiting... Cancelling all tasks
Cancellation is done!
coro3 exc CancelledError()
coro1 exc CancelledError()
Task was destroyed but it is pending!
task: <Task pending coro=<coro2() running at test.py:15> wait_for=<Task cancelled coro=<coro3() done, defined at test.py:23>>>

所以我想知道,为什么 coro2 没有被取消而 coro3 实际上 取消了?

知道了!问题出在 except 块中的这两行:

# ...
loop.stop()
loop.run_forever()

预期的取消传播因 loop.stop() 而无效。如果将代码更改为如下内容:

# ...

try:
    f1 = asyncio.ensure_future(coro1())
    f2 = asyncio.ensure_future(coro2())    
    loop.run_forever()
except KeyboardInterrupt:
    print('Exiting... Cancelling all tasks')
    f2.cancel()
    f1.cancel()
    print('Cancellation is done!')    
    try:
        loop.run_forever()
        # Wait a very short time for f2 cancelation and press Ctrl+C again.
    except KeyboardInterrupt:
        loop.stop()
        loop.run_forever()
finally:
    loop.close()

消息 Task was destroyed but it is pending! 会消失。

更好的方法是使用loop.run_until_complete()方法:

f1 = asyncio.ensure_future(coro1())
f2 = asyncio.ensure_future(coro2())
tasks = asyncio.gather(f1, f2)
try:
    loop.run_until_complete(tasks)
except KeyboardInterrupt:
    print('Exiting... Cancelling all tasks')
    tasks.cancel()  # or f1.cancel(); f2.cancel()
    print('Cancellation is done!')

    loop.run_forever()
    tasks.exception()  # To skip '_GatheringFuture exception was never retrieved' warning
finally:
    loop.close()

run_until_complete 添加将在完成(或取消)所有任务后停止循环的内部回调。