Python functools LRU 缓存在多个进程之间共享
Python functools LRU cache shared between multiple processes
假设我有 4 个负载平衡的 Python API 进程来计算一个数字的阶乘。
假设阶乘 returns 一个 Pydantic 对象或深层嵌套的字典。我不想使用 Redis 进行缓存,因为嵌套 dict/list 序列化很昂贵。所以我使用了LRU函数缓存。
问题:每个进程有4个LRU缓存。当我清除缓存时,它只清除 1 个进程(以捕获请求为准)。
- 我想在所有 4 个进程之间共享 LRU 缓存。使用 multiprocessing.shared_memory 的自定义装饰器是否可行?
- 如果那不可能,我想至少清除所有进程的缓存。使用多处理队列或 Listener/Client 阻止 API 功能,因为我必须
while True
.
多个 Python API 进程 运行 此代码:
from functools import lru_cache
# result:
# caches factorial for this process
# wanted:
# caches factorial for all 4 processes
@lru_cache
def factorial(n):
return n * factorial(n-1) if n else 1
@api.get("http://localhost:5000/purge/factorial/") # pseudo API decorator
def cache_clear():
# result:
# clears cache for this process
# wanted:
# clears cache for all processes
factorial.cache_clear()
我使用:
非常感谢!
我的问题不是 JSON/pickle 序列化速度慢。由于缓存的变量大小较大,从 Redis 存储中检索值很慢。
这是在 Python 进程之间共享数据的简单解决方案。此解决方案将您的 Python 变量转换为字节(腌制它们),因此如果序列化速度慢是您的问题,这可能不适合您。
Server(保存缓存的单独 Python 进程):
from multiprocessing.managers import SyncManager
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
SyncManager.register("syncdict", get_dict)
manager = SyncManager(("127.0.0.1", 5002), authkey=b"password")
manager.start()
input()
manager.shutdown()
Client(我的 FastAPI 应用程序中的模块):
from multiprocessing.managers import SyncManager
class CacheClient:
def __init__(self, host: str, port: int, authkey: bytes):
self.manager = SyncManager((host, port), authkey=authkey)
self.manager.connect()
SyncManager.register("syncdict")
self.syncdict = self.manager.syncdict()
def set(self, key: str, value: any):
self.syncdict.update([(key, value)])
def get(self, key: str) -> any:
return self.syncdict.get(key)
cache = CacheClient(host="127.0.0.1", port=5002, authkey=b"password")
提示: 如果您在尝试缓存 Pydantic 类 时遇到导入错误,请在 syncdict.update
之前腌制您的数据。但是,请注意,这会将您的数据腌制两次。
假设我有 4 个负载平衡的 Python API 进程来计算一个数字的阶乘。
假设阶乘 returns 一个 Pydantic 对象或深层嵌套的字典。我不想使用 Redis 进行缓存,因为嵌套 dict/list 序列化很昂贵。所以我使用了LRU函数缓存。
问题:每个进程有4个LRU缓存。当我清除缓存时,它只清除 1 个进程(以捕获请求为准)。
- 我想在所有 4 个进程之间共享 LRU 缓存。使用 multiprocessing.shared_memory 的自定义装饰器是否可行?
- 如果那不可能,我想至少清除所有进程的缓存。使用多处理队列或 Listener/Client 阻止 API 功能,因为我必须
while True
.
多个 Python API 进程 运行 此代码:
from functools import lru_cache
# result:
# caches factorial for this process
# wanted:
# caches factorial for all 4 processes
@lru_cache
def factorial(n):
return n * factorial(n-1) if n else 1
@api.get("http://localhost:5000/purge/factorial/") # pseudo API decorator
def cache_clear():
# result:
# clears cache for this process
# wanted:
# clears cache for all processes
factorial.cache_clear()
我使用:
非常感谢!
我的问题不是 JSON/pickle 序列化速度慢。由于缓存的变量大小较大,从 Redis 存储中检索值很慢。
这是在 Python 进程之间共享数据的简单解决方案。此解决方案将您的 Python 变量转换为字节(腌制它们),因此如果序列化速度慢是您的问题,这可能不适合您。
Server(保存缓存的单独 Python 进程):
from multiprocessing.managers import SyncManager
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
SyncManager.register("syncdict", get_dict)
manager = SyncManager(("127.0.0.1", 5002), authkey=b"password")
manager.start()
input()
manager.shutdown()
Client(我的 FastAPI 应用程序中的模块):
from multiprocessing.managers import SyncManager
class CacheClient:
def __init__(self, host: str, port: int, authkey: bytes):
self.manager = SyncManager((host, port), authkey=authkey)
self.manager.connect()
SyncManager.register("syncdict")
self.syncdict = self.manager.syncdict()
def set(self, key: str, value: any):
self.syncdict.update([(key, value)])
def get(self, key: str) -> any:
return self.syncdict.get(key)
cache = CacheClient(host="127.0.0.1", port=5002, authkey=b"password")
提示: 如果您在尝试缓存 Pydantic 类 时遇到导入错误,请在 syncdict.update
之前腌制您的数据。但是,请注意,这会将您的数据腌制两次。