LCOV - code coverage report
Current view: top level - proxy/src/cache - project_info.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 85.7 % 434 372
Test Date: 2024-10-22 22:13:45 Functions: 75.0 % 36 27

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::convert::Infallible;
       3              : use std::sync::atomic::AtomicU64;
       4              : use std::sync::Arc;
       5              : use std::time::Duration;
       6              : 
       7              : use async_trait::async_trait;
       8              : use dashmap::DashMap;
       9              : use rand::{thread_rng, Rng};
      10              : use smol_str::SmolStr;
      11              : use tokio::sync::Mutex;
      12              : use tokio::time::Instant;
      13              : use tracing::{debug, info};
      14              : 
      15              : use super::{Cache, Cached};
      16              : use crate::auth::IpPattern;
      17              : use crate::config::ProjectInfoCacheOptions;
      18              : use crate::control_plane::AuthSecret;
      19              : use crate::intern::{EndpointIdInt, ProjectIdInt, RoleNameInt};
      20              : use crate::{EndpointId, RoleName};
      21              : 
      22              : #[async_trait]
      23              : pub(crate) trait ProjectInfoCache {
      24              :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt);
      25              :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
      26              :     async fn decrement_active_listeners(&self);
      27              :     async fn increment_active_listeners(&self);
      28              : }
      29              : 
      30              : struct Entry<T> {
      31              :     created_at: Instant,
      32              :     value: T,
      33              : }
      34              : 
      35              : impl<T> Entry<T> {
      36            9 :     pub(crate) fn new(value: T) -> Self {
      37            9 :         Self {
      38            9 :             created_at: Instant::now(),
      39            9 :             value,
      40            9 :         }
      41            9 :     }
      42              : }
      43              : 
      44              : impl<T> From<T> for Entry<T> {
      45            9 :     fn from(value: T) -> Self {
      46            9 :         Self::new(value)
      47            9 :     }
      48              : }
      49              : 
      50              : #[derive(Default)]
      51              : struct EndpointInfo {
      52              :     secret: std::collections::HashMap<RoleNameInt, Entry<Option<AuthSecret>>>,
      53              :     allowed_ips: Option<Entry<Arc<Vec<IpPattern>>>>,
      54              : }
      55              : 
      56              : impl EndpointInfo {
      57           11 :     fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
      58           11 :         match ignore_cache_since {
      59            3 :             None => false,
      60            8 :             Some(t) => t < created_at,
      61              :         }
      62           11 :     }
      63           15 :     pub(crate) fn get_role_secret(
      64           15 :         &self,
      65           15 :         role_name: RoleNameInt,
      66           15 :         valid_since: Instant,
      67           15 :         ignore_cache_since: Option<Instant>,
      68           15 :     ) -> Option<(Option<AuthSecret>, bool)> {
      69           15 :         if let Some(secret) = self.secret.get(&role_name) {
      70           13 :             if valid_since < secret.created_at {
      71            7 :                 return Some((
      72            7 :                     secret.value.clone(),
      73            7 :                     Self::check_ignore_cache(ignore_cache_since, secret.created_at),
      74            7 :                 ));
      75            6 :             }
      76            2 :         }
      77            8 :         None
      78           15 :     }
      79              : 
      80            5 :     pub(crate) fn get_allowed_ips(
      81            5 :         &self,
      82            5 :         valid_since: Instant,
      83            5 :         ignore_cache_since: Option<Instant>,
      84            5 :     ) -> Option<(Arc<Vec<IpPattern>>, bool)> {
      85            5 :         if let Some(allowed_ips) = &self.allowed_ips {
      86            5 :             if valid_since < allowed_ips.created_at {
      87            4 :                 return Some((
      88            4 :                     allowed_ips.value.clone(),
      89            4 :                     Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
      90            4 :                 ));
      91            1 :             }
      92            0 :         }
      93            1 :         None
      94            5 :     }
      95            0 :     pub(crate) fn invalidate_allowed_ips(&mut self) {
      96            0 :         self.allowed_ips = None;
      97            0 :     }
      98            1 :     pub(crate) fn invalidate_role_secret(&mut self, role_name: RoleNameInt) {
      99            1 :         self.secret.remove(&role_name);
     100            1 :     }
     101              : }
     102              : 
     103              : /// Cache for project info.
     104              : /// This is used to cache auth data for endpoints.
     105              : /// Invalidation is done by console notifications or by TTL (if console notifications are disabled).
     106              : ///
     107              : /// We also store endpoint-to-project mapping in the cache, to be able to access per-endpoint data.
     108              : /// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
     109              : /// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
     110              : pub struct ProjectInfoCacheImpl {
     111              :     cache: DashMap<EndpointIdInt, EndpointInfo>,
     112              : 
     113              :     project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
     114              :     config: ProjectInfoCacheOptions,
     115              : 
     116              :     start_time: Instant,
     117              :     ttl_disabled_since_us: AtomicU64,
     118              :     active_listeners_lock: Mutex<usize>,
     119              : }
     120              : 
     121              : #[async_trait]
     122              : impl ProjectInfoCache for ProjectInfoCacheImpl {
     123            0 :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt) {
     124            0 :         info!("invalidating allowed ips for project `{}`", project_id);
     125            0 :         let endpoints = self
     126            0 :             .project2ep
     127            0 :             .get(&project_id)
     128            0 :             .map(|kv| kv.value().clone())
     129            0 :             .unwrap_or_default();
     130            0 :         for endpoint_id in endpoints {
     131            0 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     132            0 :                 endpoint_info.invalidate_allowed_ips();
     133            0 :             }
     134              :         }
     135            0 :     }
     136            1 :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt) {
     137            1 :         info!(
     138            0 :             "invalidating role secret for project_id `{}` and role_name `{}`",
     139              :             project_id, role_name,
     140              :         );
     141            1 :         let endpoints = self
     142            1 :             .project2ep
     143            1 :             .get(&project_id)
     144            1 :             .map(|kv| kv.value().clone())
     145            1 :             .unwrap_or_default();
     146            2 :         for endpoint_id in endpoints {
     147            1 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     148            1 :                 endpoint_info.invalidate_role_secret(role_name);
     149            1 :             }
     150              :         }
     151            1 :     }
     152            0 :     async fn decrement_active_listeners(&self) {
     153            0 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     154            0 :         if *listeners_guard == 0 {
     155            0 :             tracing::error!("active_listeners count is already 0, something is broken");
     156            0 :             return;
     157            0 :         }
     158            0 :         *listeners_guard -= 1;
     159            0 :         if *listeners_guard == 0 {
     160            0 :             self.ttl_disabled_since_us
     161            0 :                 .store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
     162            0 :         }
     163            0 :     }
     164              : 
     165            2 :     async fn increment_active_listeners(&self) {
     166            2 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     167            2 :         *listeners_guard += 1;
     168            2 :         if *listeners_guard == 1 {
     169            2 :             let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
     170            2 :             self.ttl_disabled_since_us
     171            2 :                 .store(new_ttl, std::sync::atomic::Ordering::SeqCst);
     172            2 :         }
     173            4 :     }
     174              : }
     175              : 
     176              : impl ProjectInfoCacheImpl {
     177            3 :     pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
     178            3 :         Self {
     179            3 :             cache: DashMap::new(),
     180            3 :             project2ep: DashMap::new(),
     181            3 :             config,
     182            3 :             ttl_disabled_since_us: AtomicU64::new(u64::MAX),
     183            3 :             start_time: Instant::now(),
     184            3 :             active_listeners_lock: Mutex::new(0),
     185            3 :         }
     186            3 :     }
     187              : 
     188           15 :     pub(crate) fn get_role_secret(
     189           15 :         &self,
     190           15 :         endpoint_id: &EndpointId,
     191           15 :         role_name: &RoleName,
     192           15 :     ) -> Option<Cached<&Self, Option<AuthSecret>>> {
     193           15 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     194           15 :         let role_name = RoleNameInt::get(role_name)?;
     195           15 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     196           15 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     197            7 :         let (value, ignore_cache) =
     198           15 :             endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?;
     199            7 :         if !ignore_cache {
     200            4 :             let cached = Cached {
     201            4 :                 token: Some((
     202            4 :                     self,
     203            4 :                     CachedLookupInfo::new_role_secret(endpoint_id, role_name),
     204            4 :                 )),
     205            4 :                 value,
     206            4 :             };
     207            4 :             return Some(cached);
     208            3 :         }
     209            3 :         Some(Cached::new_uncached(value))
     210           15 :     }
     211            5 :     pub(crate) fn get_allowed_ips(
     212            5 :         &self,
     213            5 :         endpoint_id: &EndpointId,
     214            5 :     ) -> Option<Cached<&Self, Arc<Vec<IpPattern>>>> {
     215            5 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     216            5 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     217            5 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     218            5 :         let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
     219            5 :         let (value, ignore_cache) = value?;
     220            4 :         if !ignore_cache {
     221            1 :             let cached = Cached {
     222            1 :                 token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id))),
     223            1 :                 value,
     224            1 :             };
     225            1 :             return Some(cached);
     226            3 :         }
     227            3 :         Some(Cached::new_uncached(value))
     228            5 :     }
     229            7 :     pub(crate) fn insert_role_secret(
     230            7 :         &self,
     231            7 :         project_id: ProjectIdInt,
     232            7 :         endpoint_id: EndpointIdInt,
     233            7 :         role_name: RoleNameInt,
     234            7 :         secret: Option<AuthSecret>,
     235            7 :     ) {
     236            7 :         if self.cache.len() >= self.config.size {
     237              :             // If there are too many entries, wait until the next gc cycle.
     238            0 :             return;
     239            7 :         }
     240            7 :         self.insert_project2endpoint(project_id, endpoint_id);
     241            7 :         let mut entry = self.cache.entry(endpoint_id).or_default();
     242            7 :         if entry.secret.len() < self.config.max_roles {
     243            6 :             entry.secret.insert(role_name, secret.into());
     244            6 :         }
     245            7 :     }
     246            3 :     pub(crate) fn insert_allowed_ips(
     247            3 :         &self,
     248            3 :         project_id: ProjectIdInt,
     249            3 :         endpoint_id: EndpointIdInt,
     250            3 :         allowed_ips: Arc<Vec<IpPattern>>,
     251            3 :     ) {
     252            3 :         if self.cache.len() >= self.config.size {
     253              :             // If there are too many entries, wait until the next gc cycle.
     254            0 :             return;
     255            3 :         }
     256            3 :         self.insert_project2endpoint(project_id, endpoint_id);
     257            3 :         self.cache.entry(endpoint_id).or_default().allowed_ips = Some(allowed_ips.into());
     258            3 :     }
     259           10 :     fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
     260           10 :         if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
     261            7 :             endpoints.insert(endpoint_id);
     262            7 :         } else {
     263            3 :             self.project2ep
     264            3 :                 .insert(project_id, HashSet::from([endpoint_id]));
     265            3 :         }
     266           10 :     }
     267           20 :     fn get_cache_times(&self) -> (Instant, Option<Instant>) {
     268           20 :         let mut valid_since = Instant::now() - self.config.ttl;
     269           20 :         // Only ignore cache if ttl is disabled.
     270           20 :         let ttl_disabled_since_us = self
     271           20 :             .ttl_disabled_since_us
     272           20 :             .load(std::sync::atomic::Ordering::Relaxed);
     273           20 :         let ignore_cache_since = if ttl_disabled_since_us == u64::MAX {
     274            7 :             None
     275              :         } else {
     276           13 :             let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us);
     277           13 :             // We are fine if entry is not older than ttl or was added before we are getting notifications.
     278           13 :             valid_since = valid_since.min(ignore_cache_since);
     279           13 :             Some(ignore_cache_since)
     280              :         };
     281           20 :         (valid_since, ignore_cache_since)
     282           20 :     }
     283              : 
     284            0 :     pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
     285            0 :         let mut interval =
     286            0 :             tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
     287              :         loop {
     288            0 :             interval.tick().await;
     289            0 :             if self.cache.len() < self.config.size {
     290              :                 // If there are not too many entries, wait until the next gc cycle.
     291            0 :                 continue;
     292            0 :             }
     293            0 :             self.gc();
     294              :         }
     295              :     }
     296              : 
     297            0 :     fn gc(&self) {
     298            0 :         let shard = thread_rng().gen_range(0..self.project2ep.shards().len());
     299            0 :         debug!(shard, "project_info_cache: performing epoch reclamation");
     300              : 
     301              :         // acquire a random shard lock
     302            0 :         let mut removed = 0;
     303            0 :         let shard = self.project2ep.shards()[shard].write();
     304            0 :         for (_, endpoints) in shard.iter() {
     305            0 :             for endpoint in endpoints.get() {
     306            0 :                 self.cache.remove(endpoint);
     307            0 :                 removed += 1;
     308            0 :             }
     309              :         }
     310              :         // We can drop this shard only after making sure that all endpoints are removed.
     311            0 :         drop(shard);
     312            0 :         info!("project_info_cache: removed {removed} endpoints");
     313            0 :     }
     314              : }
     315              : 
     316              : /// Lookup info for project info cache.
     317              : /// This is used to invalidate cache entries.
     318              : pub(crate) struct CachedLookupInfo {
     319              :     /// Search by this key.
     320              :     endpoint_id: EndpointIdInt,
     321              :     lookup_type: LookupType,
     322              : }
     323              : 
     324              : impl CachedLookupInfo {
     325            4 :     pub(self) fn new_role_secret(endpoint_id: EndpointIdInt, role_name: RoleNameInt) -> Self {
     326            4 :         Self {
     327            4 :             endpoint_id,
     328            4 :             lookup_type: LookupType::RoleSecret(role_name),
     329            4 :         }
     330            4 :     }
     331            1 :     pub(self) fn new_allowed_ips(endpoint_id: EndpointIdInt) -> Self {
     332            1 :         Self {
     333            1 :             endpoint_id,
     334            1 :             lookup_type: LookupType::AllowedIps,
     335            1 :         }
     336            1 :     }
     337              : }
     338              : 
     339              : enum LookupType {
     340              :     RoleSecret(RoleNameInt),
     341              :     AllowedIps,
     342              : }
     343              : 
     344              : impl Cache for ProjectInfoCacheImpl {
     345              :     type Key = SmolStr;
     346              :     // Value is not really used here, but we need to specify it.
     347              :     type Value = SmolStr;
     348              : 
     349              :     type LookupInfo<Key> = CachedLookupInfo;
     350              : 
     351            0 :     fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
     352            0 :         match &key.lookup_type {
     353            0 :             LookupType::RoleSecret(role_name) => {
     354            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     355            0 :                     endpoint_info.invalidate_role_secret(*role_name);
     356            0 :                 }
     357              :             }
     358              :             LookupType::AllowedIps => {
     359            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     360            0 :                     endpoint_info.invalidate_allowed_ips();
     361            0 :                 }
     362              :             }
     363              :         }
     364            0 :     }
     365              : }
     366              : 
     367              : #[cfg(test)]
     368              : mod tests {
     369              :     use super::*;
     370              :     use crate::scram::ServerSecret;
     371              :     use crate::ProjectId;
     372              : 
     373              :     #[tokio::test]
     374            1 :     async fn test_project_info_cache_settings() {
     375            1 :         tokio::time::pause();
     376            1 :         let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     377            1 :             size: 2,
     378            1 :             max_roles: 2,
     379            1 :             ttl: Duration::from_secs(1),
     380            1 :             gc_interval: Duration::from_secs(600),
     381            1 :         });
     382            1 :         let project_id: ProjectId = "project".into();
     383            1 :         let endpoint_id: EndpointId = "endpoint".into();
     384            1 :         let user1: RoleName = "user1".into();
     385            1 :         let user2: RoleName = "user2".into();
     386            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     387            1 :         let secret2 = None;
     388            1 :         let allowed_ips = Arc::new(vec![
     389            1 :             "127.0.0.1".parse().unwrap(),
     390            1 :             "127.0.0.2".parse().unwrap(),
     391            1 :         ]);
     392            1 :         cache.insert_role_secret(
     393            1 :             (&project_id).into(),
     394            1 :             (&endpoint_id).into(),
     395            1 :             (&user1).into(),
     396            1 :             secret1.clone(),
     397            1 :         );
     398            1 :         cache.insert_role_secret(
     399            1 :             (&project_id).into(),
     400            1 :             (&endpoint_id).into(),
     401            1 :             (&user2).into(),
     402            1 :             secret2.clone(),
     403            1 :         );
     404            1 :         cache.insert_allowed_ips(
     405            1 :             (&project_id).into(),
     406            1 :             (&endpoint_id).into(),
     407            1 :             allowed_ips.clone(),
     408            1 :         );
     409            1 : 
     410            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     411            1 :         assert!(cached.cached());
     412            1 :         assert_eq!(cached.value, secret1);
     413            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     414            1 :         assert!(cached.cached());
     415            1 :         assert_eq!(cached.value, secret2);
     416            1 : 
     417            1 :         // Shouldn't add more than 2 roles.
     418            1 :         let user3: RoleName = "user3".into();
     419            1 :         let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32])));
     420            1 :         cache.insert_role_secret(
     421            1 :             (&project_id).into(),
     422            1 :             (&endpoint_id).into(),
     423            1 :             (&user3).into(),
     424            1 :             secret3.clone(),
     425            1 :         );
     426            1 :         assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
     427            1 : 
     428            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     429            1 :         assert!(cached.cached());
     430            1 :         assert_eq!(cached.value, allowed_ips);
     431            1 : 
     432            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     433            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1);
     434            1 :         assert!(cached.is_none());
     435            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2);
     436            1 :         assert!(cached.is_none());
     437            1 :         let cached = cache.get_allowed_ips(&endpoint_id);
     438            1 :         assert!(cached.is_none());
     439            1 :     }
     440              : 
     441              :     #[tokio::test]
     442            1 :     async fn test_project_info_cache_invalidations() {
     443            1 :         tokio::time::pause();
     444            1 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     445            1 :             size: 2,
     446            1 :             max_roles: 2,
     447            1 :             ttl: Duration::from_secs(1),
     448            1 :             gc_interval: Duration::from_secs(600),
     449            1 :         }));
     450            1 :         cache.clone().increment_active_listeners().await;
     451            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     452            1 : 
     453            1 :         let project_id: ProjectId = "project".into();
     454            1 :         let endpoint_id: EndpointId = "endpoint".into();
     455            1 :         let user1: RoleName = "user1".into();
     456            1 :         let user2: RoleName = "user2".into();
     457            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     458            1 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     459            1 :         let allowed_ips = Arc::new(vec![
     460            1 :             "127.0.0.1".parse().unwrap(),
     461            1 :             "127.0.0.2".parse().unwrap(),
     462            1 :         ]);
     463            1 :         cache.insert_role_secret(
     464            1 :             (&project_id).into(),
     465            1 :             (&endpoint_id).into(),
     466            1 :             (&user1).into(),
     467            1 :             secret1.clone(),
     468            1 :         );
     469            1 :         cache.insert_role_secret(
     470            1 :             (&project_id).into(),
     471            1 :             (&endpoint_id).into(),
     472            1 :             (&user2).into(),
     473            1 :             secret2.clone(),
     474            1 :         );
     475            1 :         cache.insert_allowed_ips(
     476            1 :             (&project_id).into(),
     477            1 :             (&endpoint_id).into(),
     478            1 :             allowed_ips.clone(),
     479            1 :         );
     480            1 : 
     481            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     482            1 :         // Nothing should be invalidated.
     483            1 : 
     484            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     485            1 :         // TTL is disabled, so it should be impossible to invalidate this value.
     486            1 :         assert!(!cached.cached());
     487            1 :         assert_eq!(cached.value, secret1);
     488            1 : 
     489            1 :         cached.invalidate(); // Shouldn't do anything.
     490            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     491            1 :         assert_eq!(cached.value, secret1);
     492            1 : 
     493            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     494            1 :         assert!(!cached.cached());
     495            1 :         assert_eq!(cached.value, secret2);
     496            1 : 
     497            1 :         // The only way to invalidate this value is to invalidate via the api.
     498            1 :         cache.invalidate_role_secret_for_project((&project_id).into(), (&user2).into());
     499            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     500            1 : 
     501            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     502            1 :         assert!(!cached.cached());
     503            1 :         assert_eq!(cached.value, allowed_ips);
     504            1 :     }
     505              : 
     506              :     #[tokio::test]
     507            1 :     async fn test_increment_active_listeners_invalidate_added_before() {
     508            1 :         tokio::time::pause();
     509            1 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     510            1 :             size: 2,
     511            1 :             max_roles: 2,
     512            1 :             ttl: Duration::from_secs(1),
     513            1 :             gc_interval: Duration::from_secs(600),
     514            1 :         }));
     515            1 : 
     516            1 :         let project_id: ProjectId = "project".into();
     517            1 :         let endpoint_id: EndpointId = "endpoint".into();
     518            1 :         let user1: RoleName = "user1".into();
     519            1 :         let user2: RoleName = "user2".into();
     520            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     521            1 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     522            1 :         let allowed_ips = Arc::new(vec![
     523            1 :             "127.0.0.1".parse().unwrap(),
     524            1 :             "127.0.0.2".parse().unwrap(),
     525            1 :         ]);
     526            1 :         cache.insert_role_secret(
     527            1 :             (&project_id).into(),
     528            1 :             (&endpoint_id).into(),
     529            1 :             (&user1).into(),
     530            1 :             secret1.clone(),
     531            1 :         );
     532            1 :         cache.clone().increment_active_listeners().await;
     533            1 :         tokio::time::advance(Duration::from_millis(100)).await;
     534            1 :         cache.insert_role_secret(
     535            1 :             (&project_id).into(),
     536            1 :             (&endpoint_id).into(),
     537            1 :             (&user2).into(),
     538            1 :             secret2.clone(),
     539            1 :         );
     540            1 : 
     541            1 :         // Added before ttl was disabled + ttl should be still cached.
     542            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     543            1 :         assert!(cached.cached());
     544            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     545            1 :         assert!(cached.cached());
     546            1 : 
     547            1 :         tokio::time::advance(Duration::from_secs(1)).await;
     548            1 :         // Added before ttl was disabled + ttl should expire.
     549            1 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     550            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     551            1 : 
     552            1 :         // Added after ttl was disabled + ttl should not be cached.
     553            1 :         cache.insert_allowed_ips(
     554            1 :             (&project_id).into(),
     555            1 :             (&endpoint_id).into(),
     556            1 :             allowed_ips.clone(),
     557            1 :         );
     558            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     559            1 :         assert!(!cached.cached());
     560            1 : 
     561            1 :         tokio::time::advance(Duration::from_secs(1)).await;
     562            1 :         // Added before ttl was disabled + ttl still should expire.
     563            1 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     564            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     565            1 :         // Shouldn't be invalidated.
     566            1 : 
     567            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     568            1 :         assert!(!cached.cached());
     569            1 :         assert_eq!(cached.value, allowed_ips);
     570            1 :     }
     571              : }
        

Generated by: LCOV version 2.1-beta