Line data Source code
1 : use std::ops::{Deref, DerefMut};
2 : use std::time::{Duration, Instant};
3 :
4 : use moka::Expiry;
5 : use moka::notification::RemovalCause;
6 :
7 : use crate::control_plane::messages::ControlPlaneErrorMessage;
8 : use crate::metrics::{
9 : CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
10 : };
11 :
12 : /// Default TTL used when caching errors from control plane.
13 : pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
14 :
15 : /// A generic trait which exposes types of cache's key and value,
16 : /// as well as the notion of cache entry invalidation.
17 : /// This is useful for [`Cached`].
18 : pub(crate) trait Cache {
19 : /// Entry's key.
20 : type Key;
21 :
22 : /// Entry's value.
23 : type Value;
24 :
25 : /// Invalidate an entry using a lookup info.
26 : /// We don't have an empty default impl because it's error-prone.
27 : fn invalidate(&self, _: &Self::Key);
28 : }
29 :
30 : impl<C: Cache> Cache for &C {
31 : type Key = C::Key;
32 : type Value = C::Value;
33 :
34 6 : fn invalidate(&self, info: &Self::Key) {
35 6 : C::invalidate(self, info);
36 6 : }
37 : }
38 :
39 : /// Wrapper for convenient entry invalidation.
40 : pub(crate) struct Cached<C: Cache, V = <C as Cache>::Value> {
41 : /// Cache + lookup info.
42 : pub(crate) token: Option<(C, C::Key)>,
43 :
44 : /// The value itself.
45 : pub(crate) value: V,
46 : }
47 :
48 : impl<C: Cache, V> Cached<C, V> {
49 : /// Place any entry into this wrapper; invalidation will be a no-op.
50 1 : pub(crate) fn new_uncached(value: V) -> Self {
51 1 : Self { token: None, value }
52 1 : }
53 :
54 : /// Drop this entry from a cache if it's still there.
55 6 : pub(crate) fn invalidate(self) -> V {
56 6 : if let Some((cache, info)) = &self.token {
57 6 : cache.invalidate(info);
58 6 : }
59 6 : self.value
60 6 : }
61 :
62 : /// Tell if this entry is actually cached.
63 16 : pub(crate) fn cached(&self) -> bool {
64 16 : self.token.is_some()
65 16 : }
66 : }
67 :
68 : impl<C: Cache, V> Deref for Cached<C, V> {
69 : type Target = V;
70 :
71 0 : fn deref(&self) -> &Self::Target {
72 0 : &self.value
73 0 : }
74 : }
75 :
76 : impl<C: Cache, V> DerefMut for Cached<C, V> {
77 0 : fn deref_mut(&mut self) -> &mut Self::Target {
78 0 : &mut self.value
79 0 : }
80 : }
81 :
82 : pub type ControlPlaneResult<T> = Result<T, Box<ControlPlaneErrorMessage>>;
83 :
84 : #[derive(Clone, Copy)]
85 : pub struct CplaneExpiry {
86 : pub error: Duration,
87 : }
88 :
89 : impl Default for CplaneExpiry {
90 15 : fn default() -> Self {
91 15 : Self {
92 15 : error: DEFAULT_ERROR_TTL,
93 15 : }
94 15 : }
95 : }
96 :
97 : impl CplaneExpiry {
98 29 : pub fn expire_early<V>(
99 29 : &self,
100 29 : value: &ControlPlaneResult<V>,
101 29 : updated: Instant,
102 29 : ) -> Option<Duration> {
103 29 : match value {
104 25 : Ok(_) => None,
105 4 : Err(err) => Some(self.expire_err_early(err, updated)),
106 : }
107 29 : }
108 :
109 4 : pub fn expire_err_early(&self, err: &ControlPlaneErrorMessage, updated: Instant) -> Duration {
110 4 : err.status
111 4 : .as_ref()
112 4 : .and_then(|s| s.details.retry_info.as_ref())
113 4 : .map_or(self.error, |r| r.retry_at.into_std() - updated)
114 4 : }
115 : }
116 :
117 : impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
118 24 : fn expire_after_create(
119 24 : &self,
120 24 : _key: &K,
121 24 : value: &ControlPlaneResult<V>,
122 24 : created_at: Instant,
123 24 : ) -> Option<Duration> {
124 24 : self.expire_early(value, created_at)
125 24 : }
126 :
127 5 : fn expire_after_update(
128 5 : &self,
129 5 : _key: &K,
130 5 : value: &ControlPlaneResult<V>,
131 5 : updated_at: Instant,
132 5 : _duration_until_expiry: Option<Duration>,
133 5 : ) -> Option<Duration> {
134 5 : self.expire_early(value, updated_at)
135 5 : }
136 : }
137 :
138 15 : pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
139 15 : let cause = match cause {
140 3 : RemovalCause::Expired => CacheRemovalCause::Expired,
141 6 : RemovalCause::Explicit => CacheRemovalCause::Explicit,
142 5 : RemovalCause::Replaced => CacheRemovalCause::Replaced,
143 1 : RemovalCause::Size => CacheRemovalCause::Size,
144 : };
145 15 : Metrics::get()
146 15 : .cache
147 15 : .evicted_total
148 15 : .inc(CacheEviction { cache: kind, cause });
149 15 : }
150 :
151 : #[inline]
152 11 : pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
153 11 : let outcome = if cache_result.is_some() {
154 10 : CacheOutcome::Hit
155 : } else {
156 1 : CacheOutcome::Miss
157 : };
158 11 : Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
159 11 : cache: kind,
160 11 : outcome,
161 11 : });
162 11 : cache_result
163 11 : }
164 :
165 : #[inline]
166 29 : pub fn count_cache_insert(kind: CacheKind) {
167 29 : Metrics::get().cache.inserted_total.inc(kind);
168 29 : }
|