如何在 Python 3 中嵌套异步上下文管理器

How to nest Async Context Managers in Python 3

假设我们有两个通常以嵌套方式一起使用的异步上下文管理器,但只有第二个的结果通常在主体中使用。例如,如果我们发现自己经常输入以下内容:

async with context_mgr_1() as cm1:
    async with cm2.context_mgr_2() as cm2:
        ...do something with cm2...

我们如何创建一个嵌套这些上下文管理器的上下文管理器,以便我们可以这样做:

async with context_mgr_2() as cm2:
    ...do something with cm2...

contextlib.nested 用于为非异步上下文管理器完成此操作,但我在 asyncio 中找不到这样的助手。

在内部,我已经开始使用它来将同步和异步上下文管理器包装为异步管理器。它允许 AsyncExitStack 风格的推送语义以及简单地包装多个管理器。

它经过了很好的测试,但我没有发布测试或计划支持它,所以使用风险自负...

import asyncio
import logging
import sys

from functools import wraps

class AsyncContextManagerChain(object):

    def __init__(self, *managers):
        self.managers = managers
        self.stack = []
        self.values = []

    async def push(self, manager):
        try:
            if hasattr(manager, '__aenter__'):
                value = await manager.__aenter__()
            else:
                value = manager.__enter__()

            self.stack.append(manager)
            self.values.append(value)
            return value
        except:
            # if we encounter an exception somewhere along our enters,
            # we'll stop adding to the stack, and pop everything we've
            # added so far, to simulate what would happen when an inner
            # block raised an exception.
            swallow = await self.__aexit__(*sys.exc_info())
            if not swallow:
                raise

    async def __aenter__(self):
        value = None

        for manager in self.managers:
            value = await self.push(manager)

        return value

    async def __aexit__(self, exc_type, exc, tb):
        excChanged = False
        swallow = False # default value
        while self.stack:
            # no matter what the outcome, we want to attempt to call __aexit__ on
            # all context managers
            try:
                swallow = await self._pop(exc_type, exc, tb)
                if swallow:
                    # if we swallow an exception on an inner cm, outer cms would
                    # not receive it at all...
                    exc_type = None
                    exc = None
                    tb = None
            except:
                # if we encounter an exception while exiting, that is the
                # new execption we send upward
                excChanged = True
                (exc_type, exc, tb) = sys.exc_info()
                swallow = False

        if exc is None:
            # when we make it to the end, if exc is None, it was swallowed
            # somewhere along the line, and we've exited everything successfully,
            # so tell python to swallow the exception for real
            return True
        elif excChanged:
            # if the exception has been changed, we need to raise it here
            # because otherwise python will just raise the original exception
            if not swallow:
                raise exc
        else:
            # we have the original exception still, we just let python handle it...
            return swallow

    async def _pop(self, exc_type, exc, tb):
    manager = self.stack.pop()
    if hasattr(manager, '__aexit__'):
        return await manager.__aexit__(exc_type, exc, tb)
    else:
        return manager.__exit__(exc_type, exc, tb)

Kevin 的回答不遵循 3.5.2 中的 contextlib.ExitStack impl,因此我继续并根据 python 3.5.2 中的官方 impl 创建了一个。如果我发现任何问题,我会更新 impl。

github要点link:https://gist.github.com/thehesiod/b8442ed50e27a23524435a22f10c04a0

因为 Python 3.7 有 contextlib.AsyncExitStack:

async with AsyncExitStack() as stack:
    cm1 = await stack.enter_async_context(context_mgr_1())
    cm2 = await stack.enter_async_context(context_mgr_2())
    # ...do something with cm2...

可以动态使用:

async with AsyncExitStack() as stack:
    connections = [await stack.enter_async_context(get_connection())
        for i in range(5)]
    # All opened connections will automatically be released at the end of
    # the async with statement, even if attempts to open a connection
    # later in the list raise an exception.

它继承了contextlib.ExitStack,因此您可以组合异步和非异步上下文管理器:

async with AsyncExitStack() as stack:
    cm1 = await stack.enter_async_context(context_mgr_1())
    f = stack.enter_context(open('file.txt'))
    cm2 = await stack.enter_async_context(context_mgr_2())
    # ...