异步上下文管理器是否需要保护它们的清理代码不被取消?

Do asynchronous context managers need to protect their cleanup code from cancellation?

问题(我认为)

contextlib.asynccontextmanager 文档给出了这个例子:

@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield conn
    finally:
        await release_db_connection(conn)

在我看来这可能会泄漏资源。如果此代码的任务是 cancelled 而此代码位于其 await release_db_connection(conn) 行,则发布可能会中断。 asyncio.CancelledError 将从 finally 块内的某处向上传播,防止后续清理代码来自 运行.

因此,实际上,如果您要实现一个处理超时请求的 Web 服务器,则在准确错误的时间触发超时可能会导致数据库连接泄漏。

完整的可运行示例

import asyncio
from contextlib import asynccontextmanager

async def acquire_db_connection():
    await asyncio.sleep(1)
    print("Acquired database connection.")
    return "<fake connection object>"

async def release_db_connection(conn):
    await asyncio.sleep(1)
    print("Released database connection.")

@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield conn
    finally:
        await release_db_connection(conn)

async def do_stuff_with_connection():
    async with get_connection() as conn:
        await asyncio.sleep(1)
        print("Did stuff with connection.")

async def main():
    task = asyncio.create_task(do_stuff_with_connection())

    # Cancel the task just as the context manager running
    # inside of it is executing its cleanup code.
    await asyncio.sleep(2.5)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

    print("Done.")

asyncio.run(main())

Python 3.7.9 上的输出:

Acquired database connection.
Did stuff with connection.
Done.

请注意,Released database connection 永远不会打印出来。

我的问题

您可以使用 asyncio.shield 保护任务以保证上下文管理器的正常关闭,我只在 main():

中进行了更改
async def main():
    task = asyncio.create_task(do_stuff_with_connection())
    # shield context manager from cancellation
    sh_task = asyncio.shield(task)
    # Cancel the task just as the context manager running
    # inside of it is executing its cleanup code.
    await asyncio.sleep(2.5)
    sh_task.cancel()  # cancel shielded task
    try:
        await sh_task
    except asyncio.CancelledError:
        pass

    await asyncio.sleep(5)  # wait till shielded task is done

    print("Done.")

专注于保护清理不被取消是转移注意力。有很多事情可能出错,上下文管理器无法知道

  • 可能发生哪些错误,
  • 必须防止哪些错误。

资源处理实用程序负责正确处理错误。

  • 如果 release_db_connection 不能被取消,它必须保护自己不被取消。
  • 如果 acquire/release 必须成对 运行,则它必须是单个 async with 上下文管理器。进一步保护,例如反对取消,也可能在内部参与。
async def release_db_connection(conn):
    """
    Cancellation safe variant of `release_db_connection`

    Internally protects against cancellation by delaying it until cleanup.
    """
    # cleanup is run in separate task so that it
    # cannot be cancelled from the outside.
    shielded_release = asyncio.create_task(asyncio.sleep(1))
    # Wait for cleanup completion – unlike `asyncio.shield`,
    # delay any cancellation until we are done.
    try:
        await shielded_release
    except asyncio.CancelledError:
        await shielded_release
        # propagate cancellation when we are done
        raise
    finally:
        print("Released database connection.")

注意:异步清理很棘手。例如,a simple asyncio.shield is not sufficient if the event loop does not wait for shielded tasks. 避免发明自己的保护并依靠底层框架做正确的事情。


任务的取消是 正常 关闭,a) 仍然允许异步操作,b) 可能是 delayed/suppressed。准备处理 CancelledError 清理的协程是明确允许的。

Task.cancel

The coroutine then has a chance to clean up or even deny the request by suppressing the exception with a try … … except CancelledError … finally block. […] Task.cancel() does not guarantee that the Task will be cancelled, although suppressing cancellation completely is not common and is actively discouraged.

强行关机是coroutine.close/GeneratorExit。这对应于立即 同步 关闭并禁止通过 awaitasync forasync with.

暂停

coroutine.close

[…] it raises GeneratorExit at the suspension point, causing the coroutine to immediately clean itself up.