在 python asyncio 中取消嵌套协程
Cancel nested coroutines in python asyncio
在我的应用程序中,我有一个协程,它可能等待其他几个协程,如果这个协程中的每个协程都可能等待另一个协程,依此类推。
如果其中一个协程失败,则无需执行所有其他尚未执行的协程。 (在我的例子中,这甚至是有害的,我想改为启动多个回滚协程)。
那么,如何取消所有嵌套协程的执行呢?这是我现在拥有的:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
await asyncio.gather(bar1(), bar2())
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
这显然行不通。如您所见,foo
协程已取消,如我所愿,但 bar2
仍然是 运行:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4
所以,我肯定做错了什么。这里正确的方法是什么?
据我所知,在取消协程本身的同时自动取消协程的所有子任务是不可能的。所以你必须手动清理子任务。
在等待 asyncio.gather 未来时抛出异常时,您可以通过 Gathering_future 对象的 _children
属性访问剩余任务。
您的示例有效:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
gathering = asyncio.gather(bar1(), bar2())
try:
await gathering
except Exception:
# cancel all subtasks of this coroutine
[task.cancel() for task in gathering._children]
raise
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# you can cleanup here
print("Bar2 cancelled")
break
else:
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
task = asyncio.gather(task_foo, task_bar)
await task
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
returns
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Bar2 cancelled
One task failed. Canceling all
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4
当您调用 task_bar.cancel()
时,任务已经完成,因此没有效果。作为 gather docs state:
If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
这正是正在发生的事情,将您的 task_bar
协程稍微修改为:
async def bar():
try:
await asyncio.gather(bar1(), bar2())
except Exception:
print("Got a generic exception on bar")
raise
输出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4
我还在 task_bar.cancel()
调用之前打印 task_bar
,注意它已经完成,所以调用 cancel
没有效果。
就解决方案而言,我认为生成的协程需要处理它安排的协程的取消,因为协程完成后我找不到检索它们的方法(除了滥用 Task.all_tasks
听起来不对)。
说过我不得不在第一个异常中使用 wait
而不是 gather
和 return,这里有一个完整的例子:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo done')
async def bar():
done, pending = await asyncio.wait(
[bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
for task in done:
task.result() # needed to raise the exception if it happened
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
print(task_bar)
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
输出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4
它不是很好,但是很管用。
在我的应用程序中,我有一个协程,它可能等待其他几个协程,如果这个协程中的每个协程都可能等待另一个协程,依此类推。 如果其中一个协程失败,则无需执行所有其他尚未执行的协程。 (在我的例子中,这甚至是有害的,我想改为启动多个回滚协程)。 那么,如何取消所有嵌套协程的执行呢?这是我现在拥有的:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
await asyncio.gather(bar1(), bar2())
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
这显然行不通。如您所见,foo
协程已取消,如我所愿,但 bar2
仍然是 运行:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4
所以,我肯定做错了什么。这里正确的方法是什么?
据我所知,在取消协程本身的同时自动取消协程的所有子任务是不可能的。所以你必须手动清理子任务。
在等待 asyncio.gather 未来时抛出异常时,您可以通过 Gathering_future 对象的 _children
属性访问剩余任务。
您的示例有效:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
gathering = asyncio.gather(bar1(), bar2())
try:
await gathering
except Exception:
# cancel all subtasks of this coroutine
[task.cancel() for task in gathering._children]
raise
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# you can cleanup here
print("Bar2 cancelled")
break
else:
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
task = asyncio.gather(task_foo, task_bar)
await task
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
returns
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Bar2 cancelled
One task failed. Canceling all
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4
当您调用 task_bar.cancel()
时,任务已经完成,因此没有效果。作为 gather docs state:
If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
这正是正在发生的事情,将您的 task_bar
协程稍微修改为:
async def bar():
try:
await asyncio.gather(bar1(), bar2())
except Exception:
print("Got a generic exception on bar")
raise
输出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4
我还在 task_bar.cancel()
调用之前打印 task_bar
,注意它已经完成,所以调用 cancel
没有效果。
就解决方案而言,我认为生成的协程需要处理它安排的协程的取消,因为协程完成后我找不到检索它们的方法(除了滥用 Task.all_tasks
听起来不对)。
说过我不得不在第一个异常中使用 wait
而不是 gather
和 return,这里有一个完整的例子:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo done')
async def bar():
done, pending = await asyncio.wait(
[bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
for task in done:
task.result() # needed to raise the exception if it happened
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
print(task_bar)
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
输出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4
它不是很好,但是很管用。