LCOV - code coverage report
Current view: top level - proxy/src/cache - project_info.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 85.7 % 434 372
Test Date: 2024-09-19 20:36:02 Functions: 75.0 % 36 27

            Line data    Source code
       1              : use std::{
       2              :     collections::HashSet,
       3              :     convert::Infallible,
       4              :     sync::{atomic::AtomicU64, Arc},
       5              :     time::Duration,
       6              : };
       7              : 
       8              : use async_trait::async_trait;
       9              : use dashmap::DashMap;
      10              : use rand::{thread_rng, Rng};
      11              : use smol_str::SmolStr;
      12              : use tokio::sync::Mutex;
      13              : use tokio::time::Instant;
      14              : use tracing::{debug, info};
      15              : 
      16              : use crate::{
      17              :     auth::IpPattern,
      18              :     config::ProjectInfoCacheOptions,
      19              :     console::AuthSecret,
      20              :     intern::{EndpointIdInt, ProjectIdInt, RoleNameInt},
      21              :     EndpointId, RoleName,
      22              : };
      23              : 
      24              : use super::{Cache, Cached};
      25              : 
      26              : #[async_trait]
      27              : pub(crate) trait ProjectInfoCache {
      28              :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt);
      29              :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
      30              :     async fn decrement_active_listeners(&self);
      31              :     async fn increment_active_listeners(&self);
      32              : }
      33              : 
      34              : struct Entry<T> {
      35              :     created_at: Instant,
      36              :     value: T,
      37              : }
      38              : 
      39              : impl<T> Entry<T> {
      40            9 :     pub(crate) fn new(value: T) -> Self {
      41            9 :         Self {
      42            9 :             created_at: Instant::now(),
      43            9 :             value,
      44            9 :         }
      45            9 :     }
      46              : }
      47              : 
      48              : impl<T> From<T> for Entry<T> {
      49            9 :     fn from(value: T) -> Self {
      50            9 :         Self::new(value)
      51            9 :     }
      52              : }
      53              : 
      54              : #[derive(Default)]
      55              : struct EndpointInfo {
      56              :     secret: std::collections::HashMap<RoleNameInt, Entry<Option<AuthSecret>>>,
      57              :     allowed_ips: Option<Entry<Arc<Vec<IpPattern>>>>,
      58              : }
      59              : 
      60              : impl EndpointInfo {
      61           11 :     fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
      62           11 :         match ignore_cache_since {
      63            3 :             None => false,
      64            8 :             Some(t) => t < created_at,
      65              :         }
      66           11 :     }
      67           15 :     pub(crate) fn get_role_secret(
      68           15 :         &self,
      69           15 :         role_name: RoleNameInt,
      70           15 :         valid_since: Instant,
      71           15 :         ignore_cache_since: Option<Instant>,
      72           15 :     ) -> Option<(Option<AuthSecret>, bool)> {
      73           15 :         if let Some(secret) = self.secret.get(&role_name) {
      74           13 :             if valid_since < secret.created_at {
      75            7 :                 return Some((
      76            7 :                     secret.value.clone(),
      77            7 :                     Self::check_ignore_cache(ignore_cache_since, secret.created_at),
      78            7 :                 ));
      79            6 :             }
      80            2 :         }
      81            8 :         None
      82           15 :     }
      83              : 
      84            5 :     pub(crate) fn get_allowed_ips(
      85            5 :         &self,
      86            5 :         valid_since: Instant,
      87            5 :         ignore_cache_since: Option<Instant>,
      88            5 :     ) -> Option<(Arc<Vec<IpPattern>>, bool)> {
      89            5 :         if let Some(allowed_ips) = &self.allowed_ips {
      90            5 :             if valid_since < allowed_ips.created_at {
      91            4 :                 return Some((
      92            4 :                     allowed_ips.value.clone(),
      93            4 :                     Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
      94            4 :                 ));
      95            1 :             }
      96            0 :         }
      97            1 :         None
      98            5 :     }
      99            0 :     pub(crate) fn invalidate_allowed_ips(&mut self) {
     100            0 :         self.allowed_ips = None;
     101            0 :     }
     102            1 :     pub(crate) fn invalidate_role_secret(&mut self, role_name: RoleNameInt) {
     103            1 :         self.secret.remove(&role_name);
     104            1 :     }
     105              : }
     106              : 
     107              : /// Cache for project info.
     108              : /// This is used to cache auth data for endpoints.
     109              : /// Invalidation is done by console notifications or by TTL (if console notifications are disabled).
     110              : ///
     111              : /// We also store endpoint-to-project mapping in the cache, to be able to access per-endpoint data.
     112              : /// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
     113              : /// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
     114              : pub struct ProjectInfoCacheImpl {
     115              :     cache: DashMap<EndpointIdInt, EndpointInfo>,
     116              : 
     117              :     project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
     118              :     config: ProjectInfoCacheOptions,
     119              : 
     120              :     start_time: Instant,
     121              :     ttl_disabled_since_us: AtomicU64,
     122              :     active_listeners_lock: Mutex<usize>,
     123              : }
     124              : 
     125              : #[async_trait]
     126              : impl ProjectInfoCache for ProjectInfoCacheImpl {
     127            0 :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt) {
     128            0 :         info!("invalidating allowed ips for project `{}`", project_id);
     129            0 :         let endpoints = self
     130            0 :             .project2ep
     131            0 :             .get(&project_id)
     132            0 :             .map(|kv| kv.value().clone())
     133            0 :             .unwrap_or_default();
     134            0 :         for endpoint_id in endpoints {
     135            0 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     136            0 :                 endpoint_info.invalidate_allowed_ips();
     137            0 :             }
     138              :         }
     139            0 :     }
     140            1 :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt) {
     141            1 :         info!(
     142            0 :             "invalidating role secret for project_id `{}` and role_name `{}`",
     143              :             project_id, role_name,
     144              :         );
     145            1 :         let endpoints = self
     146            1 :             .project2ep
     147            1 :             .get(&project_id)
     148            1 :             .map(|kv| kv.value().clone())
     149            1 :             .unwrap_or_default();
     150            2 :         for endpoint_id in endpoints {
     151            1 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     152            1 :                 endpoint_info.invalidate_role_secret(role_name);
     153            1 :             }
     154              :         }
     155            1 :     }
     156            0 :     async fn decrement_active_listeners(&self) {
     157            0 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     158            0 :         if *listeners_guard == 0 {
     159            0 :             tracing::error!("active_listeners count is already 0, something is broken");
     160            0 :             return;
     161            0 :         }
     162            0 :         *listeners_guard -= 1;
     163            0 :         if *listeners_guard == 0 {
     164            0 :             self.ttl_disabled_since_us
     165            0 :                 .store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
     166            0 :         }
     167            0 :     }
     168              : 
     169            2 :     async fn increment_active_listeners(&self) {
     170            2 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     171            2 :         *listeners_guard += 1;
     172            2 :         if *listeners_guard == 1 {
     173            2 :             let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
     174            2 :             self.ttl_disabled_since_us
     175            2 :                 .store(new_ttl, std::sync::atomic::Ordering::SeqCst);
     176            2 :         }
     177            4 :     }
     178              : }
     179              : 
     180              : impl ProjectInfoCacheImpl {
     181            3 :     pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
     182            3 :         Self {
     183            3 :             cache: DashMap::new(),
     184            3 :             project2ep: DashMap::new(),
     185            3 :             config,
     186            3 :             ttl_disabled_since_us: AtomicU64::new(u64::MAX),
     187            3 :             start_time: Instant::now(),
     188            3 :             active_listeners_lock: Mutex::new(0),
     189            3 :         }
     190            3 :     }
     191              : 
     192           15 :     pub(crate) fn get_role_secret(
     193           15 :         &self,
     194           15 :         endpoint_id: &EndpointId,
     195           15 :         role_name: &RoleName,
     196           15 :     ) -> Option<Cached<&Self, Option<AuthSecret>>> {
     197           15 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     198           15 :         let role_name = RoleNameInt::get(role_name)?;
     199           15 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     200           15 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     201            7 :         let (value, ignore_cache) =
     202           15 :             endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?;
     203            7 :         if !ignore_cache {
     204            4 :             let cached = Cached {
     205            4 :                 token: Some((
     206            4 :                     self,
     207            4 :                     CachedLookupInfo::new_role_secret(endpoint_id, role_name),
     208            4 :                 )),
     209            4 :                 value,
     210            4 :             };
     211            4 :             return Some(cached);
     212            3 :         }
     213            3 :         Some(Cached::new_uncached(value))
     214           15 :     }
     215            5 :     pub(crate) fn get_allowed_ips(
     216            5 :         &self,
     217            5 :         endpoint_id: &EndpointId,
     218            5 :     ) -> Option<Cached<&Self, Arc<Vec<IpPattern>>>> {
     219            5 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     220            5 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     221            5 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     222            5 :         let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
     223            5 :         let (value, ignore_cache) = value?;
     224            4 :         if !ignore_cache {
     225            1 :             let cached = Cached {
     226            1 :                 token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id))),
     227            1 :                 value,
     228            1 :             };
     229            1 :             return Some(cached);
     230            3 :         }
     231            3 :         Some(Cached::new_uncached(value))
     232            5 :     }
     233            7 :     pub(crate) fn insert_role_secret(
     234            7 :         &self,
     235            7 :         project_id: ProjectIdInt,
     236            7 :         endpoint_id: EndpointIdInt,
     237            7 :         role_name: RoleNameInt,
     238            7 :         secret: Option<AuthSecret>,
     239            7 :     ) {
     240            7 :         if self.cache.len() >= self.config.size {
     241              :             // If there are too many entries, wait until the next gc cycle.
     242            0 :             return;
     243            7 :         }
     244            7 :         self.insert_project2endpoint(project_id, endpoint_id);
     245            7 :         let mut entry = self.cache.entry(endpoint_id).or_default();
     246            7 :         if entry.secret.len() < self.config.max_roles {
     247            6 :             entry.secret.insert(role_name, secret.into());
     248            6 :         }
     249            7 :     }
     250            3 :     pub(crate) fn insert_allowed_ips(
     251            3 :         &self,
     252            3 :         project_id: ProjectIdInt,
     253            3 :         endpoint_id: EndpointIdInt,
     254            3 :         allowed_ips: Arc<Vec<IpPattern>>,
     255            3 :     ) {
     256            3 :         if self.cache.len() >= self.config.size {
     257              :             // If there are too many entries, wait until the next gc cycle.
     258            0 :             return;
     259            3 :         }
     260            3 :         self.insert_project2endpoint(project_id, endpoint_id);
     261            3 :         self.cache.entry(endpoint_id).or_default().allowed_ips = Some(allowed_ips.into());
     262            3 :     }
     263           10 :     fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
     264           10 :         if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
     265            7 :             endpoints.insert(endpoint_id);
     266            7 :         } else {
     267            3 :             self.project2ep
     268            3 :                 .insert(project_id, HashSet::from([endpoint_id]));
     269            3 :         }
     270           10 :     }
     271           20 :     fn get_cache_times(&self) -> (Instant, Option<Instant>) {
     272           20 :         let mut valid_since = Instant::now() - self.config.ttl;
     273           20 :         // Only ignore cache if ttl is disabled.
     274           20 :         let ttl_disabled_since_us = self
     275           20 :             .ttl_disabled_since_us
     276           20 :             .load(std::sync::atomic::Ordering::Relaxed);
     277           20 :         let ignore_cache_since = if ttl_disabled_since_us == u64::MAX {
     278            7 :             None
     279              :         } else {
     280           13 :             let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us);
     281           13 :             // We are fine if entry is not older than ttl or was added before we are getting notifications.
     282           13 :             valid_since = valid_since.min(ignore_cache_since);
     283           13 :             Some(ignore_cache_since)
     284              :         };
     285           20 :         (valid_since, ignore_cache_since)
     286           20 :     }
     287              : 
     288            0 :     pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
     289            0 :         let mut interval =
     290            0 :             tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
     291              :         loop {
     292            0 :             interval.tick().await;
     293            0 :             if self.cache.len() < self.config.size {
     294              :                 // If there are not too many entries, wait until the next gc cycle.
     295            0 :                 continue;
     296            0 :             }
     297            0 :             self.gc();
     298              :         }
     299              :     }
     300              : 
     301            0 :     fn gc(&self) {
     302            0 :         let shard = thread_rng().gen_range(0..self.project2ep.shards().len());
     303            0 :         debug!(shard, "project_info_cache: performing epoch reclamation");
     304              : 
     305              :         // acquire a random shard lock
     306            0 :         let mut removed = 0;
     307            0 :         let shard = self.project2ep.shards()[shard].write();
     308            0 :         for (_, endpoints) in shard.iter() {
     309            0 :             for endpoint in endpoints.get() {
     310            0 :                 self.cache.remove(endpoint);
     311            0 :                 removed += 1;
     312            0 :             }
     313              :         }
     314              :         // We can drop this shard only after making sure that all endpoints are removed.
     315            0 :         drop(shard);
     316            0 :         info!("project_info_cache: removed {removed} endpoints");
     317            0 :     }
     318              : }
     319              : 
     320              : /// Lookup info for project info cache.
     321              : /// This is used to invalidate cache entries.
     322              : pub(crate) struct CachedLookupInfo {
     323              :     /// Search by this key.
     324              :     endpoint_id: EndpointIdInt,
     325              :     lookup_type: LookupType,
     326              : }
     327              : 
     328              : impl CachedLookupInfo {
     329            4 :     pub(self) fn new_role_secret(endpoint_id: EndpointIdInt, role_name: RoleNameInt) -> Self {
     330            4 :         Self {
     331            4 :             endpoint_id,
     332            4 :             lookup_type: LookupType::RoleSecret(role_name),
     333            4 :         }
     334            4 :     }
     335            1 :     pub(self) fn new_allowed_ips(endpoint_id: EndpointIdInt) -> Self {
     336            1 :         Self {
     337            1 :             endpoint_id,
     338            1 :             lookup_type: LookupType::AllowedIps,
     339            1 :         }
     340            1 :     }
     341              : }
     342              : 
     343              : enum LookupType {
     344              :     RoleSecret(RoleNameInt),
     345              :     AllowedIps,
     346              : }
     347              : 
     348              : impl Cache for ProjectInfoCacheImpl {
     349              :     type Key = SmolStr;
     350              :     // Value is not really used here, but we need to specify it.
     351              :     type Value = SmolStr;
     352              : 
     353              :     type LookupInfo<Key> = CachedLookupInfo;
     354              : 
     355            0 :     fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
     356            0 :         match &key.lookup_type {
     357            0 :             LookupType::RoleSecret(role_name) => {
     358            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     359            0 :                     endpoint_info.invalidate_role_secret(*role_name);
     360            0 :                 }
     361              :             }
     362              :             LookupType::AllowedIps => {
     363            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     364            0 :                     endpoint_info.invalidate_allowed_ips();
     365            0 :                 }
     366              :             }
     367              :         }
     368            0 :     }
     369              : }
     370              : 
     371              : #[cfg(test)]
     372              : mod tests {
     373              :     use super::*;
     374              :     use crate::{scram::ServerSecret, ProjectId};
     375              : 
     376              :     #[tokio::test]
     377            1 :     async fn test_project_info_cache_settings() {
     378            1 :         tokio::time::pause();
     379            1 :         let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     380            1 :             size: 2,
     381            1 :             max_roles: 2,
     382            1 :             ttl: Duration::from_secs(1),
     383            1 :             gc_interval: Duration::from_secs(600),
     384            1 :         });
     385            1 :         let project_id: ProjectId = "project".into();
     386            1 :         let endpoint_id: EndpointId = "endpoint".into();
     387            1 :         let user1: RoleName = "user1".into();
     388            1 :         let user2: RoleName = "user2".into();
     389            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     390            1 :         let secret2 = None;
     391            1 :         let allowed_ips = Arc::new(vec![
     392            1 :             "127.0.0.1".parse().unwrap(),
     393            1 :             "127.0.0.2".parse().unwrap(),
     394            1 :         ]);
     395            1 :         cache.insert_role_secret(
     396            1 :             (&project_id).into(),
     397            1 :             (&endpoint_id).into(),
     398            1 :             (&user1).into(),
     399            1 :             secret1.clone(),
     400            1 :         );
     401            1 :         cache.insert_role_secret(
     402            1 :             (&project_id).into(),
     403            1 :             (&endpoint_id).into(),
     404            1 :             (&user2).into(),
     405            1 :             secret2.clone(),
     406            1 :         );
     407            1 :         cache.insert_allowed_ips(
     408            1 :             (&project_id).into(),
     409            1 :             (&endpoint_id).into(),
     410            1 :             allowed_ips.clone(),
     411            1 :         );
     412            1 : 
     413            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     414            1 :         assert!(cached.cached());
     415            1 :         assert_eq!(cached.value, secret1);
     416            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     417            1 :         assert!(cached.cached());
     418            1 :         assert_eq!(cached.value, secret2);
     419            1 : 
     420            1 :         // Shouldn't add more than 2 roles.
     421            1 :         let user3: RoleName = "user3".into();
     422            1 :         let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32])));
     423            1 :         cache.insert_role_secret(
     424            1 :             (&project_id).into(),
     425            1 :             (&endpoint_id).into(),
     426            1 :             (&user3).into(),
     427            1 :             secret3.clone(),
     428            1 :         );
     429            1 :         assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
     430            1 : 
     431            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     432            1 :         assert!(cached.cached());
     433            1 :         assert_eq!(cached.value, allowed_ips);
     434            1 : 
     435            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     436            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1);
     437            1 :         assert!(cached.is_none());
     438            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2);
     439            1 :         assert!(cached.is_none());
     440            1 :         let cached = cache.get_allowed_ips(&endpoint_id);
     441            1 :         assert!(cached.is_none());
     442            1 :     }
     443              : 
     444              :     #[tokio::test]
     445            1 :     async fn test_project_info_cache_invalidations() {
     446            1 :         tokio::time::pause();
     447            1 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     448            1 :             size: 2,
     449            1 :             max_roles: 2,
     450            1 :             ttl: Duration::from_secs(1),
     451            1 :             gc_interval: Duration::from_secs(600),
     452            1 :         }));
     453            1 :         cache.clone().increment_active_listeners().await;
     454            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     455            1 : 
     456            1 :         let project_id: ProjectId = "project".into();
     457            1 :         let endpoint_id: EndpointId = "endpoint".into();
     458            1 :         let user1: RoleName = "user1".into();
     459            1 :         let user2: RoleName = "user2".into();
     460            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     461            1 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     462            1 :         let allowed_ips = Arc::new(vec![
     463            1 :             "127.0.0.1".parse().unwrap(),
     464            1 :             "127.0.0.2".parse().unwrap(),
     465            1 :         ]);
     466            1 :         cache.insert_role_secret(
     467            1 :             (&project_id).into(),
     468            1 :             (&endpoint_id).into(),
     469            1 :             (&user1).into(),
     470            1 :             secret1.clone(),
     471            1 :         );
     472            1 :         cache.insert_role_secret(
     473            1 :             (&project_id).into(),
     474            1 :             (&endpoint_id).into(),
     475            1 :             (&user2).into(),
     476            1 :             secret2.clone(),
     477            1 :         );
     478            1 :         cache.insert_allowed_ips(
     479            1 :             (&project_id).into(),
     480            1 :             (&endpoint_id).into(),
     481            1 :             allowed_ips.clone(),
     482            1 :         );
     483            1 : 
     484            1 :         tokio::time::advance(Duration::from_secs(2)).await;
     485            1 :         // Nothing should be invalidated.
     486            1 : 
     487            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     488            1 :         // TTL is disabled, so it should be impossible to invalidate this value.
     489            1 :         assert!(!cached.cached());
     490            1 :         assert_eq!(cached.value, secret1);
     491            1 : 
     492            1 :         cached.invalidate(); // Shouldn't do anything.
     493            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     494            1 :         assert_eq!(cached.value, secret1);
     495            1 : 
     496            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     497            1 :         assert!(!cached.cached());
     498            1 :         assert_eq!(cached.value, secret2);
     499            1 : 
     500            1 :         // The only way to invalidate this value is to invalidate via the api.
     501            1 :         cache.invalidate_role_secret_for_project((&project_id).into(), (&user2).into());
     502            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     503            1 : 
     504            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     505            1 :         assert!(!cached.cached());
     506            1 :         assert_eq!(cached.value, allowed_ips);
     507            1 :     }
     508              : 
     509              :     #[tokio::test]
     510            1 :     async fn test_increment_active_listeners_invalidate_added_before() {
     511            1 :         tokio::time::pause();
     512            1 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     513            1 :             size: 2,
     514            1 :             max_roles: 2,
     515            1 :             ttl: Duration::from_secs(1),
     516            1 :             gc_interval: Duration::from_secs(600),
     517            1 :         }));
     518            1 : 
     519            1 :         let project_id: ProjectId = "project".into();
     520            1 :         let endpoint_id: EndpointId = "endpoint".into();
     521            1 :         let user1: RoleName = "user1".into();
     522            1 :         let user2: RoleName = "user2".into();
     523            1 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     524            1 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     525            1 :         let allowed_ips = Arc::new(vec![
     526            1 :             "127.0.0.1".parse().unwrap(),
     527            1 :             "127.0.0.2".parse().unwrap(),
     528            1 :         ]);
     529            1 :         cache.insert_role_secret(
     530            1 :             (&project_id).into(),
     531            1 :             (&endpoint_id).into(),
     532            1 :             (&user1).into(),
     533            1 :             secret1.clone(),
     534            1 :         );
     535            1 :         cache.clone().increment_active_listeners().await;
     536            1 :         tokio::time::advance(Duration::from_millis(100)).await;
     537            1 :         cache.insert_role_secret(
     538            1 :             (&project_id).into(),
     539            1 :             (&endpoint_id).into(),
     540            1 :             (&user2).into(),
     541            1 :             secret2.clone(),
     542            1 :         );
     543            1 : 
     544            1 :         // Added before ttl was disabled + ttl should be still cached.
     545            1 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     546            1 :         assert!(cached.cached());
     547            1 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     548            1 :         assert!(cached.cached());
     549            1 : 
     550            1 :         tokio::time::advance(Duration::from_secs(1)).await;
     551            1 :         // Added before ttl was disabled + ttl should expire.
     552            1 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     553            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     554            1 : 
     555            1 :         // Added after ttl was disabled + ttl should not be cached.
     556            1 :         cache.insert_allowed_ips(
     557            1 :             (&project_id).into(),
     558            1 :             (&endpoint_id).into(),
     559            1 :             allowed_ips.clone(),
     560            1 :         );
     561            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     562            1 :         assert!(!cached.cached());
     563            1 : 
     564            1 :         tokio::time::advance(Duration::from_secs(1)).await;
     565            1 :         // Added before ttl was disabled + ttl still should expire.
     566            1 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     567            1 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     568            1 :         // Shouldn't be invalidated.
     569            1 : 
     570            1 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     571            1 :         assert!(!cached.cached());
     572            1 :         assert_eq!(cached.value, allowed_ips);
     573            1 :     }
     574              : }
        

Generated by: LCOV version 2.1-beta