在异步环境中缓存结果

Caching results in an async environment

我在 FastAPI 端点工作,该端点进行 I/O 绑定操作,这是异步的以提高效率。但是,这需要时间,所以我想把结果缓存起来,以便在一段时间内重复使用。

我现在有这个:

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

@app.get('/')
async def get(key):
    return await _get_expensive_resource(key)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test:app")

我正在尝试使用 cachetools 包来缓存结果,我已经尝试了类似以下的操作:

import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
  
app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

class ResourceCache(TTLCache):
    def __missing__(self, key):
        loop = asyncio.get_event_loop()
        resource = loop.run_until_complete(_get_expensive_resource(key))
        self[key] = resource
        return resource

resource_cache = ResourceCache(124, 300)

@app.get('/')
async def get(key: str):
    return resource_cache[key]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test2:app")

但是,这失败了,因为据我了解,__missing__ 方法是同步的,您不能从异步中调用异步。错误是:

RuntimeError: this event loop is already running.

如果我使用普通的 asyncio 而不是 uvloop,也会发生类似的错误。

对于 asyncio 事件循环,我尝试使用 nest_asyncio 包,但它没有修补 uvloop 而且,即使与 asyncio 一起使用,似乎服务在使用后冻结这是第一次。

你知道我怎样才能完成这个吗?

15 天内遇到此问题的其他人(包括我自己)的自动回答:

TTLCache 像普通的 python 字典一样工作,访问丢失的键将调用 __missing__ 方法。因此,如果字典中存在值,我们希望使用该值,如果不存在,我们可以在此方法中收集资源。此方法还应设置缓存中的键(以便下次出现)和 return 这次使用的值。

class ResourceCache(TTLCache):
    def __missing__(self, key) -> asyncio.Task:
        # Create a task 
        resource_future = asyncio.create_task(_get_expensive_resource(key))
        self[key] = resource_future
        return resource_future

因此,我们有一个将键映射到 asyncio.Task 的缓存(本质上是一个字典)。任务将在事件循环中异步执行(已经由 FastAPI 启动!)。当我们需要结果时,我们可以 await 在端点代码中或实际上在任何地方为它们提供结果,只要它和异步函数!

@app.get("/")
async def get(key:str) -> bool:
    return await resource_cache[key]

第二次调用此端点(在缓存超时内)将使用缓存的资源(在我们的示例中模拟为 'true')。