Python 中使用优先级队列的 LRU 缓存实现

LRU cache implementation in Python using priority queue

我有以下实现LRU缓存的代码。

from __future__ import annotations

from time import time

import heapq

from typing import List, Dict, TypeVar, Generic, Optional, Tuple

# LRU Cache
T = TypeVar('T')


class Element:
    def __init__(self, key: str) -> None:
        self.key = key
        self.unixtime = time()

    def __lt__(self, other: Element) -> bool:
        return self.unixtime < other.unixtime

    def __eq__(self, other: Element) -> bool:
        return self.unixtime == other.unixtime

    def __gt__(self, other: Element) -> bool:
        return (not self.unixtime < other.unixtime) and self.unixtime != other.unixtime

    def __repr__(self) -> str:
        return f'({self.key}, {self.unixtime})'


class PriorityQueue(Generic[T], list):
    def __init__(self) -> None:
        self._data: List[Optional[T]] = []
        super().__init__()

    @property
    def is_empty(self) -> bool:
        return not self._data

    def push(self, v: T) -> None:
        heapq.heappush(self._data, v)

    def popq(self) -> Optional[T]:
        if not self.is_empty:
            return heapq.heappop(self._data)
        else:
            return None

    def __repr__(self) -> str:
        return repr(self._data)


class LRUCache:
    def __init__(self, limit: int) -> None:
        self._data: Dict[str, int] = {}
        self.limit = limit
        self._keyqueue: PriorityQueue[Element] = PriorityQueue()

    def put(self, key: str, value: T) -> None:
        if len(self._data) < self.limit:    # there is still room in the cache
            if key not in self._data:
                self._keyqueue.push(Element(key))
            else:
                correct_key = [item for item in self._keyqueue._data if item.key == key][0]
                ind = self._keyqueue._data.index(correct_key)
                self._keyqueue._data[ind].unixtime = time()
            self._data[key] = value
        else:                               # cache is full
            if key not in self._data:
                out_key = self._keyqueue.popq()
                self._data.pop(out_key.key)
                self._keyqueue.push(Element(key))
            else:
                correct_key = [item for item in self._keyqueue._data if item.key == key][0]
                ind = self._keyqueue._data.index(correct_key)
                self._keyqueue._data[ind].unixtime = time()
            self._data[key] = value

    def get(self, key: str) -> Optional[T]:
        if key in self._data:
            correct_key = [item for item in self._keyqueue._data if item.key == key][0]
            ind = self._keyqueue._data.index(correct_key)
            self._keyqueue._data[ind].unixtime = time()
            return self._data[key]
        else:
            raise KeyError('Key not found in cache')

    def __repr__(self) -> str:
        return repr(self._data)

cache = LRUCache(3)
cache.put('owen', 45)
cache.put('john', 32)
cache.put('box', 4556)

cache.get('owen')
cache.get('owen')

cache.put('new', 9)
cache

我使用Elementclass的unixtime属性来决定优先级。我正在使用 heapq 模块和一个列表来实现优先级队列。也许这不是在 Python 中实现 LRU 缓存的最有效方法,但这是我想出的方法。

我的问题是,在我使用 .get() 访问 owen 密钥两次然后发出 cache.put('new', 9) - 它应该删除 john 因为它是最近最少使用的.相反,它会删除 owen.
我检查了 _keyqueueowen 最高 unixtimejohn 最低,据我了解, Python 中的 heapq 模块使用 min_heap 所以 john 记录应该被新值替换。我在这里错过了什么?

我终于发现了问题所在: 每当更新时间时,我们需要在更新后对堆数据调用heapq.heapify()。 我也写了一个稍微高效的实现,如果有人需要的话:

from typing import List, Optional, TypeVar, Tuple, Dict, Generic

from time import time

import heapq

T = TypeVar('T')


class LRUTuple(tuple):
    def __init__(self, key: Tuple[str]) -> None:
        self.key = key
        self.time = time()

    def __lt__(self, other) -> bool:
        return self.time < other.time

    def __gt__(self, other) -> bool:
        return not self.time < other.time


# test class
a = LRUTuple(('owen',))
b = LRUTuple(('aheek',))
assert b > a
assert a < b


class PriorityQueue(Generic[T]):
    def __init__(self) -> None:
        self._data: List[T] = []

    @property
    def is_empty(self) -> bool:
        return not self._data

    def add(self, v: T) -> None:
        heapq.heappush(self._data, v)

    def pop_queue(self) -> Optional[T]:
        if not self.is_empty:
            return heapq.heappop(self._data)
        else:
            print('Empty Queue')
            return None

    def _heapify(self) -> None:
        heapq.heapify(self._data)

    def peek(self) -> Optional[T]:
        if not self.is_empty:
            return self._data[0]
        else:
            print('Empty Queue')
            return None

    def __repr__(self) -> str:
        return repr(self._data)


class LRUCache:
    def __init__(self, limit: int) -> None:
        self._data: Dict[str, T] = {}
        self.limit = limit
        self._keyqueue: PriorityQueue[LRUTuple] = PriorityQueue()

    def _update_key_time(self, key: str) -> None:
        self._keyqueue._data[self._keyqueue._data.index((key,))].time = time()
        self._keyqueue._heapify()

    def put(self, key: str, value: T) -> None:
        if len(self._keyqueue._data) < self.limit:
            if key not in self._data:
                self._data[key] = value
                self._keyqueue.add(LRUTuple((key,)))
            else:
                self._data[key] = value
                self._update_key_time(key)
        else:
            # remove lru key
            poped_key = self._keyqueue.pop_queue()
            self._data.pop(poped_key[0])
            self.put(key, value)

    def get(self, key: str) -> Optional[T]:
        if key in self._data:
            self._update_key_time(key)
            return self._data[key]
        else:
            print('KeyError: key not found')
            return None

    def __repr__(self) -> str:
        return repr([(k[0], k.time) for k in self._keyqueue._data])


# test LRUCache usage
lr = LRUCache(3)
lr.put('owen', 54)
lr.put('arwen', 4)
lr.put('jiji', 98)
lr._keyqueue.peek()
lr.get('owen')
lr._keyqueue.peek()
lr
lr.put('bone', 7)   # should replace arwen!
lr
lr._keyqueue.peek()