Python 3.9:如何在非异步函数中正确等待锁

Python 3.9: How to correctly await a lock in non-async function

TL;DR: 问:如何锁定保护共享资源免受 py39 中同步功能的影响?

抱歉,如果这个问题已经得到解答 - 我只是没法轻易找到它。 我正在努力如何在访问共享资源时正确等待锁定。 我的 API 正在访问一个永远登录的后端 API,所以我正在缓存 auth 令牌,并在它过期时更新:

async def _login():
    global connection_lock
    while True:
        async with connection_lock:
            # return a new connection or the one which has been cached in the meantime
            # or raise whichever error received while attempting to login


class BackendSystem:
    @property
    def client(self):
        if self._client is None or cache().get('headers') is None:
            self._client = loop.run_until_complete(_login())
        return self._client

现在的问题是,在某些情况下,对 flask 的多个请求(可能是并行的)将被并行发送到 flask,从而导致此错误:

  File "/usr/lib/python3.6/asyncio/base_events.py", line 471, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')

我确实找到了安装 nest-asyncio 的建议,但我不确定这是否真的是我需要的 TBH。如果我理解正确,那么循环不是嵌套的(即 运行 从彼此内部,但我很可能是错误的),而是我正在尝试使用已经 运行ning - 这部分可能是非法的?

令我难以置信的是,从一个非异步的函数中对共享资源进行非常基本的锁定保护竟然如此困难,而且我无意将其转换为异步。

我创建了一个 Reproducible Example(很有帮助!):

import asyncio


connection_lock = asyncio.Lock()
loop = asyncio.get_event_loop()


class Client:
    def __init__(self, username: str):
        self.username = username

    def __str__(self):
        return f"Client({self.username!r})"


async def _login(username) -> Client:
    global connection_lock
    while True:
        print(f"{username} will lock")
        async with connection_lock:
            print(f"{username} got the lock")
            await asyncio.sleep(5)  # sleep a bit
            print(f"{username} has finished")
            return Client(username)


class BackendSystem:
    def __init__(self, username: str):
        self._client = None
        self.username = username

    @property
    def client(self):
        if self._client is None:
            self._client = loop.run_until_complete(_login(self.username))
        return self._client


def main1():
    def do_something(username: str):
        print(BackendSystem(username).client)

    for username in ["Steffen", "Lenormju"]:
        do_something(username)


def main2():
    async def do_something(username: str):
        print(BackendSystem(username).client)

    future = asyncio.gather(
        do_something("Steffen"), do_something("Lenormju")
    )
    results = loop.run_until_complete(future)
    return results


if __name__ == '__main__':
    print("main1 ================")
    main1()
    print("main2 ================")
    main2()

输出为:

main1 ================
Steffen will lock
Steffen got the lock
Steffen has finished
Client('Steffen')
Lenormju will lock
Lenormju got the lock
Lenormju has finished
Client('Lenormju')
main2 ================
Lenormju will lock
Lenormju got the lock
Steffen will lock

Traceback (most recent call last):
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 62, in <module>
    main2()
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 54, in main2
    results = loop.run_until_complete(future)
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 484, in run_until_complete
    return future.result()
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 49, in do_something
    print(BackendSystem(username).client)
  File "C:/PycharmProjects/stack_overflow/68159604.py", line 35, in client
    self._client = loop.run_until_complete(_login(self.username))
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 471, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python3.6.8\lib\asyncio\base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

当有并发请求时出现问题,main1表明顺序工作正常。当 Flask 的代码为 运行(已经在事件循环中)时,它确认您遇到了它。

一个更简单的例子是:

import asyncio


loop = asyncio.get_event_loop()


if __name__ == '__main__':
    async def lock_5_seconds():
        await asyncio.sleep(5)

    def do_synchronously_something_async():
        loop.run_until_complete(lock_5_seconds())

    async def do_something_async():
        do_synchronously_something_async()

    loop.run_until_complete(
        asyncio.gather(
            do_something_async(),
            do_something_async()
        )
    )

正在查看 asyncio.loop.run_until_complete documentation :

Run until the future [...] has completed.

this answer 中所述:

Event loop's method such as run_forever or run_until_complete — are just a ways to start event loop in general.

如果你从未停止过,你就不能多次打电话给他们。 Flask 为你启动一次,你不应该自己重新启动它。

但我想(重新)启动事件循环不是你的意思。
您的实际问题是您想同步调用异步函数。实际上,异步函数在事件循环中被安排到 运行。你不能直接打电话给他们。

但是 this question 的答案告诉你只调用 loop.run_until_complete,这在你的情况下不起作用,因为你已经有一个事件循环 运行ning.

这是关于 Reddit 上类似案例的讨论:Calling async functions from synchronous functions. And one on Whosebug from someone using FastAPI : Python: Call asynchronous code from synchronous method when there is already an event loop running

结论是没有方法完全你想要的。
我认为你应该改变你的设计:对你的 client 属性 的调用目前是同步的,尽管它必须调用异步(slowwww)代码(_login 函数)。

您没有说明其余代码的作用,但如果它是遥控器的包装器 API 我推荐类似的东西:

async def login():
    ...

class BackendSystem():
    async def ensure_is_logged():
        ...

    async def call_this_api_method():
        await self.ensure_is_logged()
        ...

并拥抱异步代码。

或者只是不要让您的登录功能异步。将两者混合在一起是令人头痛的秘诀。