如果一个失败,如何取消收集中的所有剩余任务?
How to cancel all remaining tasks in gather if one fails?
万一gather
的一个任务抛出异常,其他任务仍然允许继续。
嗯,这不是我需要的。我想区分致命错误和需要取消所有剩余任务的错误,以及非致命错误和应该记录但允许其他任务继续的错误。
这是我尝试实现此功能的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
(这里的finally await sleep
是为了防止解释器立即关闭,它会自行取消所有任务)
奇怪的是,在 gather
上调用 cancel
并没有真正取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
我对这种行为感到非常惊讶,因为它似乎与 the documentation 相矛盾,其中指出:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutine objects or futures.
(...)
Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)
我在这里错过了什么?如何取消剩余任务?
您的实现的问题是它在 sleepers
已经引发之后调用 sleepers.cancel()
。从技术上讲,gather()
返回的未来处于完成状态,因此它的取消必须是 no-op.
要更正代码,您只需要自己取消 children 而不是相信 gather
的未来来做。当然,协同程序本身是不可取消的,因此您需要先将它们转换为任务(无论如何 gather
都会这样做,因此您没有做任何额外的工作)。例如:
async def main():
tasks = [asyncio.ensure_future(my_sleep(secs))
for secs in [2, 5, 7]]
try:
await asyncio.gather(*tasks)
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
for t in tasks:
t.cancel()
finally:
await sleep(5)
I am very surprised by this behavior since it seems to be contradictory to the documentation[...]
gather
的最初障碍是它并不是真正的 运行 任务,它只是一个等待任务完成的助手。出于这个原因 gather
如果其中一些任务因异常而失败,则不会取消剩余的任务 - 它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这是 reported as a bug, but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source). It turns out that the contract of Future
实际上阻止了它的工作。当你调用cancel()
的时候,gather
返回的future已经完成,取消一个已经完成的future是没有意义的,只是no-op . (原因是一个已完成的 future 有一个 well-defined 结果,可以被外部代码观察到。取消它会改变它的结果,这是不允许的。)
换句话说,文档并没有错误,因为如果您在 await sleepers
完成之前执行它,取消就会有效。但是,它 具有误导性 ,因为它似乎允许取消 gather()
在其等待加注之一的这个重要用例中,但实际上不允许。
使用 gather
时出现的此类问题是许多人热切等待的原因(无双关语)trio-style 托儿所 in asyncio.
您可以创建自己的自定义收集函数
这会在发生任何异常时取消其所有子项:
import asyncio
async def gather(*tasks, **kwargs):
tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks ]
try:
return await asyncio.gather(*tasks, **kwargs)
except BaseException as e:
for task in tasks:
task.cancel()
raise e
# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())
万一gather
的一个任务抛出异常,其他任务仍然允许继续。
嗯,这不是我需要的。我想区分致命错误和需要取消所有剩余任务的错误,以及非致命错误和应该记录但允许其他任务继续的错误。
这是我尝试实现此功能的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
(这里的finally await sleep
是为了防止解释器立即关闭,它会自行取消所有任务)
奇怪的是,在 gather
上调用 cancel
并没有真正取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
我对这种行为感到非常惊讶,因为它似乎与 the documentation 相矛盾,其中指出:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutine objects or futures.
(...)
Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)
我在这里错过了什么?如何取消剩余任务?
您的实现的问题是它在 sleepers
已经引发之后调用 sleepers.cancel()
。从技术上讲,gather()
返回的未来处于完成状态,因此它的取消必须是 no-op.
要更正代码,您只需要自己取消 children 而不是相信 gather
的未来来做。当然,协同程序本身是不可取消的,因此您需要先将它们转换为任务(无论如何 gather
都会这样做,因此您没有做任何额外的工作)。例如:
async def main():
tasks = [asyncio.ensure_future(my_sleep(secs))
for secs in [2, 5, 7]]
try:
await asyncio.gather(*tasks)
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
for t in tasks:
t.cancel()
finally:
await sleep(5)
I am very surprised by this behavior since it seems to be contradictory to the documentation[...]
gather
的最初障碍是它并不是真正的 运行 任务,它只是一个等待任务完成的助手。出于这个原因 gather
如果其中一些任务因异常而失败,则不会取消剩余的任务 - 它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这是 reported as a bug, but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source). It turns out that the contract of Future
实际上阻止了它的工作。当你调用cancel()
的时候,gather
返回的future已经完成,取消一个已经完成的future是没有意义的,只是no-op . (原因是一个已完成的 future 有一个 well-defined 结果,可以被外部代码观察到。取消它会改变它的结果,这是不允许的。)
换句话说,文档并没有错误,因为如果您在 await sleepers
完成之前执行它,取消就会有效。但是,它 具有误导性 ,因为它似乎允许取消 gather()
在其等待加注之一的这个重要用例中,但实际上不允许。
使用 gather
时出现的此类问题是许多人热切等待的原因(无双关语)trio-style 托儿所 in asyncio.
您可以创建自己的自定义收集函数
这会在发生任何异常时取消其所有子项:
import asyncio
async def gather(*tasks, **kwargs):
tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks ]
try:
return await asyncio.gather(*tasks, **kwargs)
except BaseException as e:
for task in tasks:
task.cancel()
raise e
# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())