"""
Provide caching that stores cache items in volatile memory.
"""
from __future__ import annotations
import multiprocessing
from collections.abc import MutableMapping, Sequence
from typing import TypeAlias, Generic, Self, TypeVar, final, TYPE_CHECKING
from typing_extensions import override
from betty.cache import CacheItem
from betty.cache._base import _CommonCacheBase, _StaticCacheItem, _CommonCacheBaseState
from betty.typing import processsafe
if TYPE_CHECKING:
from betty.concurrent import Ledger, AsynchronizedLock
from multiprocessing.managers import SyncManager
_CacheItemValueContraT = TypeVar("_CacheItemValueContraT", contravariant=True)
_MemoryCacheStore: TypeAlias = MutableMapping[
tuple[str, ...],
"CacheItem[_CacheItemValueContraT] | None | _MemoryCacheStore[_CacheItemValueContraT]",
]
@final
class _MemoryCacheState(
Generic[_CacheItemValueContraT],
_CommonCacheBaseState["MemoryCache[_CacheItemValueContraT]"],
):
def __init__(
self,
cache_lock: AsynchronizedLock,
cache_item_lock_ledger: Ledger,
store: _MemoryCacheStore[_CacheItemValueContraT],
):
super().__init__(cache_lock, cache_item_lock_ledger)
self.store = store
[docs]
@final
@processsafe
class MemoryCache(
_CommonCacheBase[_CacheItemValueContraT], Generic[_CacheItemValueContraT]
):
"""
Provide a cache that stores cache items in volatile memory.
"""
[docs]
def __init__(
self,
*,
scopes: Sequence[str] | None = None,
manager: SyncManager | _MemoryCacheState[_CacheItemValueContraT] | None = None,
):
super().__init__(scopes=scopes, manager=manager)
if isinstance(manager, _MemoryCacheState):
self._store = manager.store
else:
self._store = multiprocessing.Manager().dict()
[docs]
@override
def with_scope(self, scope: str) -> Self:
return type(self)(
scopes=(*self._scopes, scope),
manager=_MemoryCacheState[_CacheItemValueContraT](
self._cache_lock, self._cache_item_lock_ledger, self._store
),
)
def _cache_item_key(self, cache_item_id: str) -> tuple[str, ...]:
return *self._scopes, cache_item_id
@override
async def _get(
self, cache_item_id: str
) -> CacheItem[_CacheItemValueContraT] | None:
cache_item = self._store.get(self._cache_item_key(cache_item_id), None)
if isinstance(cache_item, CacheItem):
return cache_item
return None
@override
async def _set(
self,
cache_item_id: str,
value: _CacheItemValueContraT,
*,
modified: int | float | None = None,
) -> None:
self._store[self._cache_item_key(cache_item_id)] = _StaticCacheItem(
value, modified
)
@override
async def _delete(self, cache_item_id: str) -> None:
self._store.pop(self._cache_item_key(cache_item_id), None)
@override
async def _clear(self) -> None:
self._store.clear()