如何在 asyncio 中查找 CancelledError 的原因?

How to find the cause of CancelledError in asyncio?

我有一个依赖一些第三方库的大项目,有时它的执行会被 CancelledError 中断。

为了演示这个问题,让我们看一个小例子:

import asyncio


async def main():
    task = asyncio.create_task(foo())

    # Cancel the task in 1 second.
    loop = asyncio.get_event_loop()
    loop.call_later(1.0, lambda: task.cancel())

    await task


async def foo():
    await asyncio.sleep(999)


if __name__ == '__main__':
    asyncio.run(main())

回溯:

Traceback (most recent call last):
  File "/Users/ss/Library/Application Support/JetBrains/PyCharm2021.2/scratches/async.py", line 19, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError

如您所见,没有关于 CancelledError 来源的信息。我如何找出它的确切原因?

我想出的一种方法是放置很多 try/except 块来捕获 CancelledError 并缩小它的来源范围。但这很乏味。

如果不调用 call_exception_handler inside of it. That provides a context to the exception handler which can be customized using set_exception_handler.

,则无法处理任务的异步异常

如果您正在创建异步任务,则必须在协程中使用 try/except,这可能会发生异常。我已经演示了一些使用 set_exception_handler

捕获异步任务异常的最小实现
import asyncio
import logging
from asyncio import CancelledError


def async_error_handler(loop, context):
    logger = logging.Logger(__name__)
    logger.error(context.get("message"))


async def main():
    loop = asyncio.get_running_loop()
    
    # Set custom exception handler
    loop.set_exception_handler(async_error_handler)
    task = loop.create_task(foo())

    # Cancel the task after 1 second
    loop.call_later(1, task.cancel)

    await task


async def foo():
    try:
        await asyncio.sleep(999)
    except CancelledError:
        # Catch the case when the coroutine has been canceled
        loop = asyncio.get_running_loop()

        # Emit an event to exception handler with custom context
        loop.call_exception_handler(context={
            "message": "Task has been canceled"
        })


if __name__ == "__main__":
    asyncio.run(main())

context has more attributes that can also be customized. Read more about error handling here.

我已经通过对项目中的每个异步函数应用装饰器来解决它。装饰器的工作很简单——当函数引发 CancelledError 时记录一条消息。这样我们就可以看到哪些功能(更重要的是,按什么顺序)被取消了。

装饰器代码如下:

def log_cancellation(f):
    async def wrapper(*args, **kwargs):
        try:
            return await f(*args, **kwargs)
        except asyncio.CancelledError:
            print(f"Cancelled {f}")
            raise
    return wrapper

为了在任何地方添加这个装饰器,我使用了正则表达式。查找:(.*)(async def)。替换为:@log_cancellation\n.

同时为了避免在每个文件中导入 log_cancellation 我修改了内置函数: builtins.log_cancellation = log_cancellation