Python 中使用优先级队列的 LRU 缓存实现
LRU cache implementation in Python using priority queue
我有以下实现LRU缓存的代码。
from __future__ import annotations
from time import time
import heapq
from typing import List, Dict, TypeVar, Generic, Optional, Tuple
# LRU Cache
T = TypeVar('T')
class Element:
def __init__(self, key: str) -> None:
self.key = key
self.unixtime = time()
def __lt__(self, other: Element) -> bool:
return self.unixtime < other.unixtime
def __eq__(self, other: Element) -> bool:
return self.unixtime == other.unixtime
def __gt__(self, other: Element) -> bool:
return (not self.unixtime < other.unixtime) and self.unixtime != other.unixtime
def __repr__(self) -> str:
return f'({self.key}, {self.unixtime})'
class PriorityQueue(Generic[T], list):
def __init__(self) -> None:
self._data: List[Optional[T]] = []
super().__init__()
@property
def is_empty(self) -> bool:
return not self._data
def push(self, v: T) -> None:
heapq.heappush(self._data, v)
def popq(self) -> Optional[T]:
if not self.is_empty:
return heapq.heappop(self._data)
else:
return None
def __repr__(self) -> str:
return repr(self._data)
class LRUCache:
def __init__(self, limit: int) -> None:
self._data: Dict[str, int] = {}
self.limit = limit
self._keyqueue: PriorityQueue[Element] = PriorityQueue()
def put(self, key: str, value: T) -> None:
if len(self._data) < self.limit: # there is still room in the cache
if key not in self._data:
self._keyqueue.push(Element(key))
else:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
self._data[key] = value
else: # cache is full
if key not in self._data:
out_key = self._keyqueue.popq()
self._data.pop(out_key.key)
self._keyqueue.push(Element(key))
else:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
self._data[key] = value
def get(self, key: str) -> Optional[T]:
if key in self._data:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
return self._data[key]
else:
raise KeyError('Key not found in cache')
def __repr__(self) -> str:
return repr(self._data)
cache = LRUCache(3)
cache.put('owen', 45)
cache.put('john', 32)
cache.put('box', 4556)
cache.get('owen')
cache.get('owen')
cache.put('new', 9)
cache
我使用Element
class的unixtime
属性来决定优先级。我正在使用 heapq
模块和一个列表来实现优先级队列。也许这不是在 Python 中实现 LRU 缓存的最有效方法,但这是我想出的方法。
我的问题是,在我使用 .get()
访问 owen
密钥两次然后发出 cache.put('new', 9)
- 它应该删除 john
因为它是最近最少使用的.相反,它会删除 owen
.
我检查了 _keyqueue
和 owen
最高 unixtime
和 john
最低,据我了解, Python 中的 heapq
模块使用 min_heap
所以 john
记录应该被新值替换。我在这里错过了什么?
我终于发现了问题所在:
每当更新时间时,我们需要在更新后对堆数据调用heapq.heapify()
。
我也写了一个稍微高效的实现,如果有人需要的话:
from typing import List, Optional, TypeVar, Tuple, Dict, Generic
from time import time
import heapq
T = TypeVar('T')
class LRUTuple(tuple):
def __init__(self, key: Tuple[str]) -> None:
self.key = key
self.time = time()
def __lt__(self, other) -> bool:
return self.time < other.time
def __gt__(self, other) -> bool:
return not self.time < other.time
# test class
a = LRUTuple(('owen',))
b = LRUTuple(('aheek',))
assert b > a
assert a < b
class PriorityQueue(Generic[T]):
def __init__(self) -> None:
self._data: List[T] = []
@property
def is_empty(self) -> bool:
return not self._data
def add(self, v: T) -> None:
heapq.heappush(self._data, v)
def pop_queue(self) -> Optional[T]:
if not self.is_empty:
return heapq.heappop(self._data)
else:
print('Empty Queue')
return None
def _heapify(self) -> None:
heapq.heapify(self._data)
def peek(self) -> Optional[T]:
if not self.is_empty:
return self._data[0]
else:
print('Empty Queue')
return None
def __repr__(self) -> str:
return repr(self._data)
class LRUCache:
def __init__(self, limit: int) -> None:
self._data: Dict[str, T] = {}
self.limit = limit
self._keyqueue: PriorityQueue[LRUTuple] = PriorityQueue()
def _update_key_time(self, key: str) -> None:
self._keyqueue._data[self._keyqueue._data.index((key,))].time = time()
self._keyqueue._heapify()
def put(self, key: str, value: T) -> None:
if len(self._keyqueue._data) < self.limit:
if key not in self._data:
self._data[key] = value
self._keyqueue.add(LRUTuple((key,)))
else:
self._data[key] = value
self._update_key_time(key)
else:
# remove lru key
poped_key = self._keyqueue.pop_queue()
self._data.pop(poped_key[0])
self.put(key, value)
def get(self, key: str) -> Optional[T]:
if key in self._data:
self._update_key_time(key)
return self._data[key]
else:
print('KeyError: key not found')
return None
def __repr__(self) -> str:
return repr([(k[0], k.time) for k in self._keyqueue._data])
# test LRUCache usage
lr = LRUCache(3)
lr.put('owen', 54)
lr.put('arwen', 4)
lr.put('jiji', 98)
lr._keyqueue.peek()
lr.get('owen')
lr._keyqueue.peek()
lr
lr.put('bone', 7) # should replace arwen!
lr
lr._keyqueue.peek()
我有以下实现LRU缓存的代码。
from __future__ import annotations
from time import time
import heapq
from typing import List, Dict, TypeVar, Generic, Optional, Tuple
# LRU Cache
T = TypeVar('T')
class Element:
def __init__(self, key: str) -> None:
self.key = key
self.unixtime = time()
def __lt__(self, other: Element) -> bool:
return self.unixtime < other.unixtime
def __eq__(self, other: Element) -> bool:
return self.unixtime == other.unixtime
def __gt__(self, other: Element) -> bool:
return (not self.unixtime < other.unixtime) and self.unixtime != other.unixtime
def __repr__(self) -> str:
return f'({self.key}, {self.unixtime})'
class PriorityQueue(Generic[T], list):
def __init__(self) -> None:
self._data: List[Optional[T]] = []
super().__init__()
@property
def is_empty(self) -> bool:
return not self._data
def push(self, v: T) -> None:
heapq.heappush(self._data, v)
def popq(self) -> Optional[T]:
if not self.is_empty:
return heapq.heappop(self._data)
else:
return None
def __repr__(self) -> str:
return repr(self._data)
class LRUCache:
def __init__(self, limit: int) -> None:
self._data: Dict[str, int] = {}
self.limit = limit
self._keyqueue: PriorityQueue[Element] = PriorityQueue()
def put(self, key: str, value: T) -> None:
if len(self._data) < self.limit: # there is still room in the cache
if key not in self._data:
self._keyqueue.push(Element(key))
else:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
self._data[key] = value
else: # cache is full
if key not in self._data:
out_key = self._keyqueue.popq()
self._data.pop(out_key.key)
self._keyqueue.push(Element(key))
else:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
self._data[key] = value
def get(self, key: str) -> Optional[T]:
if key in self._data:
correct_key = [item for item in self._keyqueue._data if item.key == key][0]
ind = self._keyqueue._data.index(correct_key)
self._keyqueue._data[ind].unixtime = time()
return self._data[key]
else:
raise KeyError('Key not found in cache')
def __repr__(self) -> str:
return repr(self._data)
cache = LRUCache(3)
cache.put('owen', 45)
cache.put('john', 32)
cache.put('box', 4556)
cache.get('owen')
cache.get('owen')
cache.put('new', 9)
cache
我使用Element
class的unixtime
属性来决定优先级。我正在使用 heapq
模块和一个列表来实现优先级队列。也许这不是在 Python 中实现 LRU 缓存的最有效方法,但这是我想出的方法。
我的问题是,在我使用 .get()
访问 owen
密钥两次然后发出 cache.put('new', 9)
- 它应该删除 john
因为它是最近最少使用的.相反,它会删除 owen
.
我检查了 _keyqueue
和 owen
最高 unixtime
和 john
最低,据我了解, Python 中的 heapq
模块使用 min_heap
所以 john
记录应该被新值替换。我在这里错过了什么?
我终于发现了问题所在:
每当更新时间时,我们需要在更新后对堆数据调用heapq.heapify()
。
我也写了一个稍微高效的实现,如果有人需要的话:
from typing import List, Optional, TypeVar, Tuple, Dict, Generic
from time import time
import heapq
T = TypeVar('T')
class LRUTuple(tuple):
def __init__(self, key: Tuple[str]) -> None:
self.key = key
self.time = time()
def __lt__(self, other) -> bool:
return self.time < other.time
def __gt__(self, other) -> bool:
return not self.time < other.time
# test class
a = LRUTuple(('owen',))
b = LRUTuple(('aheek',))
assert b > a
assert a < b
class PriorityQueue(Generic[T]):
def __init__(self) -> None:
self._data: List[T] = []
@property
def is_empty(self) -> bool:
return not self._data
def add(self, v: T) -> None:
heapq.heappush(self._data, v)
def pop_queue(self) -> Optional[T]:
if not self.is_empty:
return heapq.heappop(self._data)
else:
print('Empty Queue')
return None
def _heapify(self) -> None:
heapq.heapify(self._data)
def peek(self) -> Optional[T]:
if not self.is_empty:
return self._data[0]
else:
print('Empty Queue')
return None
def __repr__(self) -> str:
return repr(self._data)
class LRUCache:
def __init__(self, limit: int) -> None:
self._data: Dict[str, T] = {}
self.limit = limit
self._keyqueue: PriorityQueue[LRUTuple] = PriorityQueue()
def _update_key_time(self, key: str) -> None:
self._keyqueue._data[self._keyqueue._data.index((key,))].time = time()
self._keyqueue._heapify()
def put(self, key: str, value: T) -> None:
if len(self._keyqueue._data) < self.limit:
if key not in self._data:
self._data[key] = value
self._keyqueue.add(LRUTuple((key,)))
else:
self._data[key] = value
self._update_key_time(key)
else:
# remove lru key
poped_key = self._keyqueue.pop_queue()
self._data.pop(poped_key[0])
self.put(key, value)
def get(self, key: str) -> Optional[T]:
if key in self._data:
self._update_key_time(key)
return self._data[key]
else:
print('KeyError: key not found')
return None
def __repr__(self) -> str:
return repr([(k[0], k.time) for k in self._keyqueue._data])
# test LRUCache usage
lr = LRUCache(3)
lr.put('owen', 54)
lr.put('arwen', 4)
lr.put('jiji', 98)
lr._keyqueue.peek()
lr.get('owen')
lr._keyqueue.peek()
lr
lr.put('bone', 7) # should replace arwen!
lr
lr._keyqueue.peek()