在异步环境中缓存结果
Caching results in an async environment
我在 FastAPI 端点工作,该端点进行 I/O 绑定操作,这是异步的以提高效率。但是,这需要时间,所以我想把结果缓存起来,以便在一段时间内重复使用。
我现在有这个:
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")
我正在尝试使用 cachetools
包来缓存结果,我已经尝试了类似以下的操作:
import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")
但是,这失败了,因为据我了解,__missing__
方法是同步的,您不能从异步中调用异步。错误是:
RuntimeError: this event loop is already running.
如果我使用普通的 asyncio 而不是 uvloop,也会发生类似的错误。
对于 asyncio 事件循环,我尝试使用 nest_asyncio
包,但它没有修补 uvloop
而且,即使与 asyncio 一起使用,似乎服务在使用后冻结这是第一次。
你知道我怎样才能完成这个吗?
15 天内遇到此问题的其他人(包括我自己)的自动回答:
TTLCache
像普通的 python 字典一样工作,访问丢失的键将调用 __missing__
方法。因此,如果字典中存在值,我们希望使用该值,如果不存在,我们可以在此方法中收集资源。此方法还应设置缓存中的键(以便下次出现)和 return 这次使用的值。
class ResourceCache(TTLCache):
def __missing__(self, key) -> asyncio.Task:
# Create a task
resource_future = asyncio.create_task(_get_expensive_resource(key))
self[key] = resource_future
return resource_future
因此,我们有一个将键映射到 asyncio.Task 的缓存(本质上是一个字典)。任务将在事件循环中异步执行(已经由 FastAPI 启动!)。当我们需要结果时,我们可以 await
在端点代码中或实际上在任何地方为它们提供结果,只要它和异步函数!
@app.get("/")
async def get(key:str) -> bool:
return await resource_cache[key]
第二次调用此端点(在缓存超时内)将使用缓存的资源(在我们的示例中模拟为 'true')。
我在 FastAPI 端点工作,该端点进行 I/O 绑定操作,这是异步的以提高效率。但是,这需要时间,所以我想把结果缓存起来,以便在一段时间内重复使用。
我现在有这个:
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")
我正在尝试使用 cachetools
包来缓存结果,我已经尝试了类似以下的操作:
import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")
但是,这失败了,因为据我了解,__missing__
方法是同步的,您不能从异步中调用异步。错误是:
RuntimeError: this event loop is already running.
如果我使用普通的 asyncio 而不是 uvloop,也会发生类似的错误。
对于 asyncio 事件循环,我尝试使用 nest_asyncio
包,但它没有修补 uvloop
而且,即使与 asyncio 一起使用,似乎服务在使用后冻结这是第一次。
你知道我怎样才能完成这个吗?
15 天内遇到此问题的其他人(包括我自己)的自动回答:
TTLCache
像普通的 python 字典一样工作,访问丢失的键将调用 __missing__
方法。因此,如果字典中存在值,我们希望使用该值,如果不存在,我们可以在此方法中收集资源。此方法还应设置缓存中的键(以便下次出现)和 return 这次使用的值。
class ResourceCache(TTLCache):
def __missing__(self, key) -> asyncio.Task:
# Create a task
resource_future = asyncio.create_task(_get_expensive_resource(key))
self[key] = resource_future
return resource_future
因此,我们有一个将键映射到 asyncio.Task 的缓存(本质上是一个字典)。任务将在事件循环中异步执行(已经由 FastAPI 启动!)。当我们需要结果时,我们可以 await
在端点代码中或实际上在任何地方为它们提供结果,只要它和异步函数!
@app.get("/")
async def get(key:str) -> bool:
return await resource_cache[key]
第二次调用此端点(在缓存超时内)将使用缓存的资源(在我们的示例中模拟为 'true')。