Asyncio:当其中一个任务抛出异常时终止所有任务
Asyncio: terminating all tasks when one of them throws exception
我正在编写 python 运行异步任务的上下文管理器。我希望我的经理终止,如果它的任何任务抛出异常。这是示例代码:
class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
def __enter__(self):
return self
def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()
if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)
async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))
results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)
async def func2(self):
i = 5
while i > 0:
print('func2')
await asyncio.sleep(1)
i -= 1
raise Exception
async def _async_start(self):
self.loop.create_task(self.func1())
self.loop.create_task(self.func2())
def start(self):
self.loop.run_until_complete(self._async_start())
with MyClass() as myClass:
myClass.start()
myClass.loop.run_forever()
这是此脚本的输出:
func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
raise Exception
Exception
func1
func1
func1
.
.
.
我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即启动 运行。
如何将异常传递给循环,以便它关闭所有其他任务?
我不明白您为什么要这样使用上下文管理器 (CM)。也许有更好的方法。
无论如何,如果给出了 CM 并且您将 loop.run_forever()
放入 with
块,我知道在这种情况下退出循环的唯一方法是将控制权传递给 CM 的退出函数是 loop.stop()
.
这是一个小装饰器,可以处理除 loop.stop()
取消之外的所有异常。
def watchdog(afunc):
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
return
except Exception as err:
print("exception {err}")
asyncio.get_event_loop().stop()
return run
如果您装饰所有由 CM(func1
和 func2
)作为任务启动的协程,例如:
@watchdog
async def func2(self):
然后它将在第一次异常后停止。
我正在编写 python 运行异步任务的上下文管理器。我希望我的经理终止,如果它的任何任务抛出异常。这是示例代码:
class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
def __enter__(self):
return self
def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()
if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)
async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))
results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)
async def func2(self):
i = 5
while i > 0:
print('func2')
await asyncio.sleep(1)
i -= 1
raise Exception
async def _async_start(self):
self.loop.create_task(self.func1())
self.loop.create_task(self.func2())
def start(self):
self.loop.run_until_complete(self._async_start())
with MyClass() as myClass:
myClass.start()
myClass.loop.run_forever()
这是此脚本的输出:
func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
raise Exception
Exception
func1
func1
func1
.
.
.
我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即启动 运行。
如何将异常传递给循环,以便它关闭所有其他任务?
我不明白您为什么要这样使用上下文管理器 (CM)。也许有更好的方法。
无论如何,如果给出了 CM 并且您将 loop.run_forever()
放入 with
块,我知道在这种情况下退出循环的唯一方法是将控制权传递给 CM 的退出函数是 loop.stop()
.
这是一个小装饰器,可以处理除 loop.stop()
取消之外的所有异常。
def watchdog(afunc):
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
return
except Exception as err:
print("exception {err}")
asyncio.get_event_loop().stop()
return run
如果您装饰所有由 CM(func1
和 func2
)作为任务启动的协程,例如:
@watchdog
async def func2(self):
然后它将在第一次异常后停止。