Asyncio:当其中一个任务抛出异常时终止所有任务

Asyncio: terminating all tasks when one of them throws exception

我正在编写 python 运行异步任务的上下文管理器。我希望我的经理终止,如果它的任何任务抛出异常。这是示例代码:

class MyClass:
  def __init__(self):
    if asyncio.get_event_loop().is_closed():
      asyncio.set_event_loop(asyncio.new_event_loop())

    self.loop = asyncio.get_event_loop()

  def __enter__(self):
    return self

  def __exit__(self, excType, excValue, tb):
    try:
      self.loop.run_until_complete(self._exit_loop())
    finally:
      self.loop.close()

    if excType is not None:
      print(excType.__name__, ':', excValue)
      traceback.print_tb(tb)

  async def _exit_loop(self):
    tasks = [task for task in asyncio.all_tasks(self.loop) if
             task is not asyncio.current_task(self.loop)]
    list(map(lambda task: task.cancel(), tasks))

    results = await asyncio.gather(*tasks, return_exceptions=True)
    self.loop.stop()


  async def func1(self):
    while True:
      print('func1')
      await asyncio.sleep(1)

  async def func2(self):
    i = 5
    while i > 0:
      print('func2')
      await asyncio.sleep(1)
      i -= 1
    raise Exception

  async def _async_start(self):
    self.loop.create_task(self.func1())
    self.loop.create_task(self.func2())

  def start(self):
    self.loop.run_until_complete(self._async_start())

with MyClass() as myClass:
  myClass.start()
  myClass.loop.run_forever()

这是此脚本的输出:

func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
  File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
    raise Exception
Exception
func1
func1
func1
.
.
.

我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即启动 运行。

如何将异常传递给循环,以便它关闭所有其他任务?

我不明白您为什么要这样使用上下文管理器 (CM)。也许有更好的方法。

无论如何,如果给出了 CM 并且您将 loop.run_forever() 放入 with 块,我知道在这种情况下退出循环的唯一方法是将控制权传递给 CM 的退出函数是 loop.stop().

这是一个小装饰器,可以处理除 loop.stop() 取消之外的所有异常。

def watchdog(afunc):
    @functools.wraps(afunc)
    async def run(*args, **kwargs):
        try:
            await afunc(*args, **kwargs)
        except asyncio.CancelledError:
            return
        except Exception as err:
            print("exception {err}")
        asyncio.get_event_loop().stop()
    return run

如果您装饰所有由 CM(func1func2)作为任务启动的协程,例如:

@watchdog
async def func2(self):

然后它将在第一次异常后停止。