Source code for agora.cache.cache

import time
from collections import OrderedDict
from threading import Lock
from typing import Optional


[docs]class BaseCache:
[docs] def set(self, key: bytes, value: bytes): raise NotImplementedError('BaseCache is an abstract class. Subclasses must implement `set`')
[docs] def get(self, key: bytes) -> Optional[bytes]: raise NotImplementedError('BaseCache is an abstract class. Subclasses must implement `get`')
[docs]class LRUCache(BaseCache): """An in-memory LRU cache. :param default_ttl: the default amount of time, in seconds, that an entry will be cached if no other TTL is provided. :param max_entries: the maximum number of entries the cache will hold. """ def __init__(self, default_ttl: int, max_entries: int): self._cache = OrderedDict() self._entry_expiry = {} self._lock = Lock() self._ttl = default_ttl self._max_entries = max_entries
[docs] def set(self, key: bytes, value: bytes, ttl: Optional[int] = None): ttl = ttl if ttl else self._ttl with self._lock: self._cache[key] = value self._cache.move_to_end(key, last=False) self._entry_expiry[key] = time.time() + ttl if len(self._cache) > self._max_entries: self._evict()
[docs] def get(self, key: bytes) -> Optional[bytes]: with self._lock: if not self._has_valid_entry(key): self._clear(key) return None value = self._cache.get(key, []) self._cache.move_to_end(key, last=False) return value
[docs] def clear_all(self): with self._lock: self._cache.clear() self._entry_expiry.clear()
def _has_valid_entry(self, key: bytes): exp = self._entry_expiry.get(key) return exp and exp > time.time() def _evict(self): key, _ = self._cache.popitem() del self._entry_expiry[key] def _clear(self, key): self._cache.pop(key, None) self._entry_expiry.pop(key, None)