LCOV - code coverage report
Current view: top level - proxy/src - cache.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 20.3 % 79 16
Test Date: 2023-09-06 10:18:01 Functions: 20.0 % 20 4

            Line data    Source code
       1              : use std::{
       2              :     borrow::Borrow,
       3              :     hash::Hash,
       4              :     ops::{Deref, DerefMut},
       5              :     time::{Duration, Instant},
       6              : };
       7              : use tracing::debug;
       8              : 
       9              : // This seems to make more sense than `lru` or `cached`:
      10              : //
      11              : // * `near/nearcore` ditched `cached` in favor of `lru`
      12              : //   (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
      13              : //
      14              : // * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
      15              : //   This severely hinders its usage both in terms of creating wrappers and supported key types.
      16              : //
      17              : // On the other hand, `hashlink` has good download stats and appears to be maintained.
      18              : use hashlink::{linked_hash_map::RawEntryMut, LruCache};
      19              : 
      20              : /// A generic trait which exposes types of cache's key and value,
      21              : /// as well as the notion of cache entry invalidation.
      22              : /// This is useful for [`timed_lru::Cached`].
      23              : pub trait Cache {
      24              :     /// Entry's key.
      25              :     type Key;
      26              : 
      27              :     /// Entry's value.
      28              :     type Value;
      29              : 
      30              :     /// Used for entry invalidation.
      31              :     type LookupInfo<Key>;
      32              : 
      33              :     /// Invalidate an entry using a lookup info.
      34              :     /// We don't have an empty default impl because it's error-prone.
      35              :     fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
      36              : }
      37              : 
      38              : impl<C: Cache> Cache for &C {
      39              :     type Key = C::Key;
      40              :     type Value = C::Value;
      41              :     type LookupInfo<Key> = C::LookupInfo<Key>;
      42              : 
      43            0 :     fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
      44            0 :         C::invalidate(self, info)
      45            0 :     }
      46              : }
      47              : 
      48              : pub use timed_lru::TimedLru;
      49              : pub mod timed_lru {
      50              :     use super::*;
      51              : 
      52              :     /// An implementation of timed LRU cache with fixed capacity.
      53              :     /// Key properties:
      54              :     ///
      55              :     /// * Whenever a new entry is inserted, the least recently accessed one is evicted.
      56              :     ///   The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
      57              :     ///
      58              :     /// * When the entry is about to be retrieved, we check its expiration timestamp.
      59              :     ///   If the entry has expired, we remove it from the cache; Otherwise we bump the
      60              :     ///   expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
      61              :     ///   its existence.
      62              :     ///
      63              :     /// * There's an API for immediate invalidation (removal) of a cache entry;
      64              :     ///   It's useful in case we know for sure that the entry is no longer correct.
      65              :     ///   See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
      66              :     ///
      67              :     /// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
      68              :     ///   or by a successful lookup (i.e. the entry hasn't expired yet).
      69              :     ///   There is no background job to reap the expired records.
      70              :     ///
      71              :     /// * It's possible for an entry that has not yet expired entry to be evicted
      72              :     ///   before expired items. That's a bit wasteful, but probably fine in practice.
      73              :     pub struct TimedLru<K, V> {
      74              :         /// Cache's name for tracing.
      75              :         name: &'static str,
      76              : 
      77              :         /// The underlying cache implementation.
      78              :         cache: parking_lot::Mutex<LruCache<K, Entry<V>>>,
      79              : 
      80              :         /// Default time-to-live of a single entry.
      81              :         ttl: Duration,
      82              :     }
      83              : 
      84              :     impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
      85              :         type Key = K;
      86              :         type Value = V;
      87              :         type LookupInfo<Key> = LookupInfo<Key>;
      88              : 
      89            0 :         fn invalidate(&self, info: &Self::LookupInfo<K>) {
      90            0 :             self.invalidate_raw(info)
      91            0 :         }
      92              :     }
      93              : 
      94              :     struct Entry<T> {
      95              :         created_at: Instant,
      96              :         expires_at: Instant,
      97              :         value: T,
      98              :     }
      99              : 
     100              :     impl<K: Hash + Eq, V> TimedLru<K, V> {
     101              :         /// Construct a new LRU cache with timed entries.
     102            0 :         pub fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self {
     103            0 :             Self {
     104            0 :                 name,
     105            0 :                 cache: LruCache::new(capacity).into(),
     106            0 :                 ttl,
     107            0 :             }
     108            0 :         }
     109              : 
     110              :         /// Drop an entry from the cache if it's outdated.
     111            0 :         #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
     112              :         fn invalidate_raw(&self, info: &LookupInfo<K>) {
     113              :             let now = Instant::now();
     114              : 
     115              :             // Do costly things before taking the lock.
     116              :             let mut cache = self.cache.lock();
     117              :             let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
     118              :                 RawEntryMut::Vacant(_) => return,
     119              :                 RawEntryMut::Occupied(x) => x,
     120              :             };
     121              : 
     122              :             // Remove the entry if it was created prior to lookup timestamp.
     123              :             let entry = raw_entry.get();
     124              :             let (created_at, expires_at) = (entry.created_at, entry.expires_at);
     125              :             let should_remove = created_at <= info.created_at || expires_at <= now;
     126              : 
     127              :             if should_remove {
     128              :                 raw_entry.remove();
     129              :             }
     130              : 
     131              :             drop(cache); // drop lock before logging
     132            0 :             debug!(
     133            0 :                 created_at = format_args!("{created_at:?}"),
     134            0 :                 expires_at = format_args!("{expires_at:?}"),
     135            0 :                 entry_removed = should_remove,
     136            0 :                 "processed a cache entry invalidation event"
     137            0 :             );
     138              :         }
     139              : 
     140              :         /// Try retrieving an entry by its key, then execute `extract` if it exists.
     141            0 :         #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
     142              :         fn get_raw<Q, R>(&self, key: &Q, extract: impl FnOnce(&K, &Entry<V>) -> R) -> Option<R>
     143              :         where
     144              :             K: Borrow<Q>,
     145              :             Q: Hash + Eq + ?Sized,
     146              :         {
     147              :             let now = Instant::now();
     148              :             let deadline = now.checked_add(self.ttl).expect("time overflow");
     149              : 
     150              :             // Do costly things before taking the lock.
     151              :             let mut cache = self.cache.lock();
     152              :             let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
     153              :                 RawEntryMut::Vacant(_) => return None,
     154              :                 RawEntryMut::Occupied(x) => x,
     155              :             };
     156              : 
     157              :             // Immeditely drop the entry if it has expired.
     158              :             let entry = raw_entry.get();
     159              :             if entry.expires_at <= now {
     160              :                 raw_entry.remove();
     161              :                 return None;
     162              :             }
     163              : 
     164              :             let value = extract(raw_entry.key(), entry);
     165              :             let (created_at, expires_at) = (entry.created_at, entry.expires_at);
     166              : 
     167              :             // Update the deadline and the entry's position in the LRU list.
     168              :             raw_entry.get_mut().expires_at = deadline;
     169              :             raw_entry.to_back();
     170              : 
     171              :             drop(cache); // drop lock before logging
     172            0 :             debug!(
     173            0 :                 created_at = format_args!("{created_at:?}"),
     174            0 :                 old_expires_at = format_args!("{expires_at:?}"),
     175            0 :                 new_expires_at = format_args!("{deadline:?}"),
     176            0 :                 "accessed a cache entry"
     177            0 :             );
     178              : 
     179              :             Some(value)
     180              :         }
     181              : 
     182              :         /// Insert an entry to the cache. If an entry with the same key already
     183              :         /// existed, return the previous value and its creation timestamp.
     184            0 :         #[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
     185              :         fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
     186              :             let created_at = Instant::now();
     187              :             let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
     188              : 
     189              :             let entry = Entry {
     190              :                 created_at,
     191              :                 expires_at,
     192              :                 value,
     193              :             };
     194              : 
     195              :             // Do costly things before taking the lock.
     196              :             let old = self
     197              :                 .cache
     198              :                 .lock()
     199              :                 .insert(key, entry)
     200            0 :                 .map(|entry| entry.value);
     201              : 
     202            0 :             debug!(
     203            0 :                 created_at = format_args!("{created_at:?}"),
     204            0 :                 expires_at = format_args!("{expires_at:?}"),
     205            0 :                 replaced = old.is_some(),
     206            0 :                 "created a cache entry"
     207            0 :             );
     208              : 
     209              :             (created_at, old)
     210              :         }
     211              :     }
     212              : 
     213              :     impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
     214            0 :         pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
     215            0 :             let (created_at, old) = self.insert_raw(key.clone(), value.clone());
     216            0 : 
     217            0 :             let cached = Cached {
     218            0 :                 token: Some((self, LookupInfo { created_at, key })),
     219            0 :                 value,
     220            0 :             };
     221            0 : 
     222            0 :             (old, cached)
     223            0 :         }
     224              :     }
     225              : 
     226              :     impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
     227              :         /// Retrieve a cached entry in convenient wrapper.
     228            0 :         pub fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
     229            0 :         where
     230            0 :             K: Borrow<Q> + Clone,
     231            0 :             Q: Hash + Eq + ?Sized,
     232            0 :         {
     233            0 :             self.get_raw(key, |key, entry| {
     234            0 :                 let info = LookupInfo {
     235            0 :                     created_at: entry.created_at,
     236            0 :                     key: key.clone(),
     237            0 :                 };
     238            0 : 
     239            0 :                 Cached {
     240            0 :                     token: Some((self, info)),
     241            0 :                     value: entry.value.clone(),
     242            0 :                 }
     243            0 :             })
     244            0 :         }
     245              :     }
     246              : 
     247              :     /// Lookup information for key invalidation.
     248              :     pub struct LookupInfo<K> {
     249              :         /// Time of creation of a cache [`Entry`].
     250              :         /// We use this during invalidation lookups to prevent eviction of a newer
     251              :         /// entry sharing the same key (it might've been inserted by a different
     252              :         /// task after we got the entry we're trying to invalidate now).
     253              :         created_at: Instant,
     254              : 
     255              :         /// Search by this key.
     256              :         key: K,
     257              :     }
     258              : 
     259              :     /// Wrapper for convenient entry invalidation.
     260              :     pub struct Cached<C: Cache> {
     261              :         /// Cache + lookup info.
     262              :         token: Option<(C, C::LookupInfo<C::Key>)>,
     263              : 
     264              :         /// The value itself.
     265              :         value: C::Value,
     266              :     }
     267              : 
     268              :     impl<C: Cache> Cached<C> {
     269              :         /// Place any entry into this wrapper; invalidation will be a no-op.
     270           61 :         pub fn new_uncached(value: C::Value) -> Self {
     271           61 :             Self { token: None, value }
     272           61 :         }
     273              : 
     274              :         /// Drop this entry from a cache if it's still there.
     275              :         pub fn invalidate(self) -> C::Value {
     276            8 :             if let Some((cache, info)) = &self.token {
     277            0 :                 cache.invalidate(info);
     278            8 :             }
     279            8 :             self.value
     280            8 :         }
     281              : 
     282              :         /// Tell if this entry is actually cached.
     283            8 :         pub fn cached(&self) -> bool {
     284            8 :             self.token.is_some()
     285            8 :         }
     286              :     }
     287              : 
     288              :     impl<C: Cache> Deref for Cached<C> {
     289              :         type Target = C::Value;
     290              : 
     291          103 :         fn deref(&self) -> &Self::Target {
     292          103 :             &self.value
     293          103 :         }
     294              :     }
     295              : 
     296              :     impl<C: Cache> DerefMut for Cached<C> {
     297          119 :         fn deref_mut(&mut self) -> &mut Self::Target {
     298          119 :             &mut self.value
     299          119 :         }
     300              :     }
     301              : }
        

Generated by: LCOV version 2.1-beta