我可以在异步协程上使用阻塞锁吗?

Can I use an blocking lock on an asynchronous coroutine?

我试图了解在异步上下文中使用同步锁的问题是什么。在我的代码中,我已经达到了多个协程可能访问同一个共享资源的地步。 API 获取这个资源是同步的,我现在不想把它变成异步的。

这是我设计的API。

class OAuthManager(abc.ABC):
    """
    Represents an OAuth token manager.

    The manager main responsibility is to return an oauth access token
    and manage it's lifecycle (i.e. refreshing and storage).
    """
    @abc.abstractmethod
    def get_access_token(self) -> OAuthToken:
        """
        Should return the [OAuthToken] used to authorize the client
        to access OAuth protected resources.

        The implementation should handle refreshing and saving the token,
        if necessary.
        """
        pass

本质上,OAuth 管理器应该能够始终 return 令牌,无论是来自内存、存储还是通过从第三方刷新令牌 API。

我通过将同步任务卸载到后台线程来进行异步调用,我想这不会是一个大问题,因为这些方法都是 IO 绑定的。

class AsyncEndpoint:
    """
    Transforms an sync endpoint into an async one.

    The asynchronous tasks are executed by offloading a synchronous task into
    a executor thread. Due to the GIL, this method is only useful with IO
    bound tasks.
    """
    def __init__(self, endpoint):
        self.endpoint = endpoint

    def __getattr__(self, name: str) -> Union[Any, Callable]:
        attr = getattr(self.endpoint, name)

        if not callable(attr):
            return attr

        async def _wrapper(*args, **kwargs):
            return await utils.run_async(attr, *args, **kwargs)

        return _wrapper

异步 运行 的方法将使用同步 get_access_token 方法,并且此访问令牌是协同程序之间的共享资源。

我正在尝试使用装饰器模式为此方法添加一个锁。

class LockOAuthManager(OAuthManager):
    """
     Most of the times tokens will be retrieved from memory
     which means the lock won't be locked for a long time,
     In the worst case, the lock will remain locked for 
     an full http request period.
     
     But if one thread needed to retrieve the token from the
     API, the same would have to happen to the other threads,
     so I believe the waiting penalty would happen anyway.
    """
    def __init__(self, oauth_manager: OAuthManager):
        self.lock = threading.Lock()
        self.oauth_manager = oauth_manager

    def get_access_token(self) -> OAuthToken:
        with self.lock:
            return self.oauth_manager.get_access_token()

令人惊讶的是,我没有找到太多关于这个主题的信息,这可能指出我做错了什么,但即使我做错了,我仍然不想知道可以做些什么错了。

据我了解,阻塞锁(即 threading.Lock)持有用于 运行 协程的线程,这意味着可以使用同一线程的其他协程不会'无法运行。异步锁(即 asyncio.Lock)将允许将线程分配给其他任务。

所以我的问题是:

协程内部使用阻塞锁会不会导致其他并发问题?

其他不访问共享资源的协程是否会受到锁的影响(即是否可以在其他线程上调度)?

Will any other concurrency problems occur due to the use of the blocking lock inside the coroutine?

这取决于 run_async 的作用。据我了解你的问题,阻塞锁不在“协程内部”,它在一个普通函数内部,它本身是通过 run_async 实用程序调用的。由于该函数 运行 在单独的线程中,因此 threading.Lock 是保护共享资源的自然方式。

您找不到太多关于此的信息,因为您正在做的不是 asyncio 的设计用途。如果您的代码包含阻塞调用,您可能应该使用线程而不是 asyncio。 asyncio 的全部要点是在幕后使用 async 调用,并使用 await 在单个线程内实现并发以将执行让给其他线程。如果使用得当,asyncio 可以提高可伸缩性并消除由不受控制的线程切换引起的竞争条件。在您的代码中,您没有获得这些好处,因为您最终只是使用了一个线程池,为同步付出了代价。