Python 使用 OrderedDict 使用 LRU 进行基于大小的缓存
Python using OrderedDict for sized based caching with LRU
我想知道如何使用 OrderedDict 实现基于大小的 LRU。我苦苦挣扎的部分是在调用 __contains__
时移动链表的头部。除了 __contains__
方法之外,以下实现有效。它导致无限递归。我有什么想法可以做到吗?
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def __contains__(self, key):
self.total_req += 1
if OrderedDict.__contains__(self, key):
self.hit += 1
value = OrderedDict.__getitem__ (self,key)
self.move_item_to_the_top(key, value)
return True
else:
return False
def move_item_to_the_top(self, key, value):
OrderedDict.__setitem__(self, key, value)
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
我是这样使用的:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
if cache_key not in cache:
cache[cache_key] = int(obj_size)
print cache
运行 你的代码我得到以下错误:
python cache.py
(1, 3)
(2, 3)
(1, 3)
Traceback (most recent call last):
File "cache.py", line 68, in <module>
if cache_key not in cache:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
[...]
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
RuntimeError: maximum recursion depth exceeded in __instancecheck__
查看 collections.py
的第 75 行表明您的回调 Cache.__contains__
导致无限循环。
您可以在不覆盖 __contains__
的情况下重写 Cache
class,而是使用 Cache.__getitem__
来跟踪对缓存的访问:
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def move_item_to_the_top(self, key, value):
del self[key]
OrderedDict.__setitem__(self, key, value)
def __getitem__(self, key):
self.total_req += 1
try:
value = OrderedDict.__getitem__(self, key)
except KeyError:
raise
else:
self.hit += 1
self.move_item_to_the_top(key, value)
return value
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
您仍然可以使用 foo not in cache
但这不会算作未命中或命中。如果你想计算 any 访问,请使用首选语法 try/except
[1] ,例如:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
[1] 这是根据列表或字典中的项目是否存在有条件地执行某些操作的首选语法,因为它只需要对容器进行一次访问。
文档有一个配方,你可以在这里看到它:https://docs.python.org/3/library/collections.html#collections.OrderedDict
class LRU(OrderedDict):
'Limit size, evicting the least recently looked-up key when full'
def __init__(self, maxsize=128, *args, **kwds):
self.maxsize = maxsize
super().__init__(*args, **kwds)
def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value
def __setitem__(self, key, value):
super().__setitem__(key, value)
if len(self) > self.maxsize:
oldest = next(iter(self))
del self[oldest]
我想知道如何使用 OrderedDict 实现基于大小的 LRU。我苦苦挣扎的部分是在调用 __contains__
时移动链表的头部。除了 __contains__
方法之外,以下实现有效。它导致无限递归。我有什么想法可以做到吗?
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def __contains__(self, key):
self.total_req += 1
if OrderedDict.__contains__(self, key):
self.hit += 1
value = OrderedDict.__getitem__ (self,key)
self.move_item_to_the_top(key, value)
return True
else:
return False
def move_item_to_the_top(self, key, value):
OrderedDict.__setitem__(self, key, value)
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
我是这样使用的:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
if cache_key not in cache:
cache[cache_key] = int(obj_size)
print cache
运行 你的代码我得到以下错误:
python cache.py
(1, 3)
(2, 3)
(1, 3)
Traceback (most recent call last):
File "cache.py", line 68, in <module>
if cache_key not in cache:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
[...]
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
File "/usr/lib/python2.7/collections.py", line 75, in __setitem__
if key not in self:
File "cache.py", line 20, in __contains__
self.move_item_to_the_top(key, value)
File "cache.py", line 26, in move_item_to_the_top
OrderedDict.__setitem__(self, key, value)
RuntimeError: maximum recursion depth exceeded in __instancecheck__
查看 collections.py
的第 75 行表明您的回调 Cache.__contains__
导致无限循环。
您可以在不覆盖 __contains__
的情况下重写 Cache
class,而是使用 Cache.__getitem__
来跟踪对缓存的访问:
from collections import OrderedDict
class Cache(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self.val_sum = 0
self.hit = 0
self.num_evicted = 0
self.total_req = 0
self._check_size_limit()
def move_item_to_the_top(self, key, value):
del self[key]
OrderedDict.__setitem__(self, key, value)
def __getitem__(self, key):
self.total_req += 1
try:
value = OrderedDict.__getitem__(self, key)
except KeyError:
raise
else:
self.hit += 1
self.move_item_to_the_top(key, value)
return value
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self.val_sum += value
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while self.val_sum > self.size_limit:
key, value = self.popitem(last=False)
self.val_sum -= value
self.num_evicted += 1
def get_cache_size(self):
return self.val_sum
def get_number_evicted(self):
return self.num_evicted
def get_hit_ratio(self):
return 1.00 * self.hit / self.total_req
def get_total_req(self):
return self.total_req
def get_hits(self):
return self.hit
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
您仍然可以使用 foo not in cache
但这不会算作未命中或命中。如果你想计算 any 访问,请使用首选语法 try/except
[1] ,例如:
if __name__ == "__main__":
cache_size_B = 10
cache = Cache(size_limit=cache_size_B)
items = [(1,3), (2,3), (1,3), (3,4), (1,3), (5,5)]
for item in items:
cache_key = str(item[0])
obj_size = item[1]
print item
try:
cache[cache_key]
except KeyError:
cache[cache_key] = int(obj_size)
print cache
[1] 这是根据列表或字典中的项目是否存在有条件地执行某些操作的首选语法,因为它只需要对容器进行一次访问。
文档有一个配方,你可以在这里看到它:https://docs.python.org/3/library/collections.html#collections.OrderedDict
class LRU(OrderedDict):
'Limit size, evicting the least recently looked-up key when full'
def __init__(self, maxsize=128, *args, **kwds):
self.maxsize = maxsize
super().__init__(*args, **kwds)
def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value
def __setitem__(self, key, value):
super().__setitem__(key, value)
if len(self) > self.maxsize:
oldest = next(iter(self))
del self[oldest]