LCOV - code coverage report
Current view: top level - proxy/src/cache - project_info.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 85.7 % 434 372
Test Date: 2024-06-25 15:47:26 Functions: 75.0 % 36 27

            Line data    Source code
       1              : use std::{
       2              :     collections::HashSet,
       3              :     convert::Infallible,
       4              :     sync::{atomic::AtomicU64, Arc},
       5              :     time::Duration,
       6              : };
       7              : 
       8              : use async_trait::async_trait;
       9              : use dashmap::DashMap;
      10              : use rand::{thread_rng, Rng};
      11              : use smol_str::SmolStr;
      12              : use tokio::sync::Mutex;
      13              : use tokio::time::Instant;
      14              : use tracing::{debug, info};
      15              : 
      16              : use crate::{
      17              :     auth::IpPattern,
      18              :     config::ProjectInfoCacheOptions,
      19              :     console::AuthSecret,
      20              :     intern::{EndpointIdInt, ProjectIdInt, RoleNameInt},
      21              :     EndpointId, RoleName,
      22              : };
      23              : 
      24              : use super::{Cache, Cached};
      25              : 
      26              : #[async_trait]
      27              : pub trait ProjectInfoCache {
      28              :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt);
      29              :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
      30              :     async fn decrement_active_listeners(&self);
      31              :     async fn increment_active_listeners(&self);
      32              : }
      33              : 
      34              : struct Entry<T> {
      35              :     created_at: Instant,
      36              :     value: T,
      37              : }
      38              : 
      39              : impl<T> Entry<T> {
      40           18 :     pub fn new(value: T) -> Self {
      41           18 :         Self {
      42           18 :             created_at: Instant::now(),
      43           18 :             value,
      44           18 :         }
      45           18 :     }
      46              : }
      47              : 
      48              : impl<T> From<T> for Entry<T> {
      49           18 :     fn from(value: T) -> Self {
      50           18 :         Self::new(value)
      51           18 :     }
      52              : }
      53              : 
      54              : #[derive(Default)]
      55              : struct EndpointInfo {
      56              :     secret: std::collections::HashMap<RoleNameInt, Entry<Option<AuthSecret>>>,
      57              :     allowed_ips: Option<Entry<Arc<Vec<IpPattern>>>>,
      58              : }
      59              : 
      60              : impl EndpointInfo {
      61           22 :     fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
      62           22 :         match ignore_cache_since {
      63            6 :             None => false,
      64           16 :             Some(t) => t < created_at,
      65              :         }
      66           22 :     }
      67           30 :     pub fn get_role_secret(
      68           30 :         &self,
      69           30 :         role_name: RoleNameInt,
      70           30 :         valid_since: Instant,
      71           30 :         ignore_cache_since: Option<Instant>,
      72           30 :     ) -> Option<(Option<AuthSecret>, bool)> {
      73           30 :         if let Some(secret) = self.secret.get(&role_name) {
      74           26 :             if valid_since < secret.created_at {
      75           14 :                 return Some((
      76           14 :                     secret.value.clone(),
      77           14 :                     Self::check_ignore_cache(ignore_cache_since, secret.created_at),
      78           14 :                 ));
      79           12 :             }
      80            4 :         }
      81           16 :         None
      82           30 :     }
      83              : 
      84           10 :     pub fn get_allowed_ips(
      85           10 :         &self,
      86           10 :         valid_since: Instant,
      87           10 :         ignore_cache_since: Option<Instant>,
      88           10 :     ) -> Option<(Arc<Vec<IpPattern>>, bool)> {
      89           10 :         if let Some(allowed_ips) = &self.allowed_ips {
      90           10 :             if valid_since < allowed_ips.created_at {
      91            8 :                 return Some((
      92            8 :                     allowed_ips.value.clone(),
      93            8 :                     Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
      94            8 :                 ));
      95            2 :             }
      96            0 :         }
      97            2 :         None
      98           10 :     }
      99            0 :     pub fn invalidate_allowed_ips(&mut self) {
     100            0 :         self.allowed_ips = None;
     101            0 :     }
     102            2 :     pub fn invalidate_role_secret(&mut self, role_name: RoleNameInt) {
     103            2 :         self.secret.remove(&role_name);
     104            2 :     }
     105              : }
     106              : 
     107              : /// Cache for project info.
     108              : /// This is used to cache auth data for endpoints.
     109              : /// Invalidation is done by console notifications or by TTL (if console notifications are disabled).
     110              : ///
     111              : /// We also store endpoint-to-project mapping in the cache, to be able to access per-endpoint data.
     112              : /// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
     113              : /// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
     114              : pub struct ProjectInfoCacheImpl {
     115              :     cache: DashMap<EndpointIdInt, EndpointInfo>,
     116              : 
     117              :     project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
     118              :     config: ProjectInfoCacheOptions,
     119              : 
     120              :     start_time: Instant,
     121              :     ttl_disabled_since_us: AtomicU64,
     122              :     active_listeners_lock: Mutex<usize>,
     123              : }
     124              : 
     125              : #[async_trait]
     126              : impl ProjectInfoCache for ProjectInfoCacheImpl {
     127            0 :     fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt) {
     128            0 :         info!("invalidating allowed ips for project `{}`", project_id);
     129            0 :         let endpoints = self
     130            0 :             .project2ep
     131            0 :             .get(&project_id)
     132            0 :             .map(|kv| kv.value().clone())
     133            0 :             .unwrap_or_default();
     134            0 :         for endpoint_id in endpoints {
     135            0 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     136            0 :                 endpoint_info.invalidate_allowed_ips();
     137            0 :             }
     138              :         }
     139            0 :     }
     140            2 :     fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt) {
     141            2 :         info!(
     142            0 :             "invalidating role secret for project_id `{}` and role_name `{}`",
     143              :             project_id, role_name,
     144              :         );
     145            2 :         let endpoints = self
     146            2 :             .project2ep
     147            2 :             .get(&project_id)
     148            2 :             .map(|kv| kv.value().clone())
     149            2 :             .unwrap_or_default();
     150            4 :         for endpoint_id in endpoints {
     151            2 :             if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
     152            2 :                 endpoint_info.invalidate_role_secret(role_name);
     153            2 :             }
     154              :         }
     155            2 :     }
     156            0 :     async fn decrement_active_listeners(&self) {
     157            0 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     158            0 :         if *listeners_guard == 0 {
     159            0 :             tracing::error!("active_listeners count is already 0, something is broken");
     160            0 :             return;
     161            0 :         }
     162            0 :         *listeners_guard -= 1;
     163            0 :         if *listeners_guard == 0 {
     164            0 :             self.ttl_disabled_since_us
     165            0 :                 .store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
     166            0 :         }
     167            0 :     }
     168              : 
     169            4 :     async fn increment_active_listeners(&self) {
     170            4 :         let mut listeners_guard = self.active_listeners_lock.lock().await;
     171            4 :         *listeners_guard += 1;
     172            4 :         if *listeners_guard == 1 {
     173            4 :             let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
     174            4 :             self.ttl_disabled_since_us
     175            4 :                 .store(new_ttl, std::sync::atomic::Ordering::SeqCst);
     176            4 :         }
     177            4 :     }
     178              : }
     179              : 
     180              : impl ProjectInfoCacheImpl {
     181            6 :     pub fn new(config: ProjectInfoCacheOptions) -> Self {
     182            6 :         Self {
     183            6 :             cache: DashMap::new(),
     184            6 :             project2ep: DashMap::new(),
     185            6 :             config,
     186            6 :             ttl_disabled_since_us: AtomicU64::new(u64::MAX),
     187            6 :             start_time: Instant::now(),
     188            6 :             active_listeners_lock: Mutex::new(0),
     189            6 :         }
     190            6 :     }
     191              : 
     192           30 :     pub fn get_role_secret(
     193           30 :         &self,
     194           30 :         endpoint_id: &EndpointId,
     195           30 :         role_name: &RoleName,
     196           30 :     ) -> Option<Cached<&Self, Option<AuthSecret>>> {
     197           30 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     198           30 :         let role_name = RoleNameInt::get(role_name)?;
     199           30 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     200           30 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     201           14 :         let (value, ignore_cache) =
     202           30 :             endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?;
     203           14 :         if !ignore_cache {
     204            8 :             let cached = Cached {
     205            8 :                 token: Some((
     206            8 :                     self,
     207            8 :                     CachedLookupInfo::new_role_secret(endpoint_id, role_name),
     208            8 :                 )),
     209            8 :                 value,
     210            8 :             };
     211            8 :             return Some(cached);
     212            6 :         }
     213            6 :         Some(Cached::new_uncached(value))
     214           30 :     }
     215           10 :     pub fn get_allowed_ips(
     216           10 :         &self,
     217           10 :         endpoint_id: &EndpointId,
     218           10 :     ) -> Option<Cached<&Self, Arc<Vec<IpPattern>>>> {
     219           10 :         let endpoint_id = EndpointIdInt::get(endpoint_id)?;
     220           10 :         let (valid_since, ignore_cache_since) = self.get_cache_times();
     221           10 :         let endpoint_info = self.cache.get(&endpoint_id)?;
     222           10 :         let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
     223           10 :         let (value, ignore_cache) = value?;
     224            8 :         if !ignore_cache {
     225            2 :             let cached = Cached {
     226            2 :                 token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id))),
     227            2 :                 value,
     228            2 :             };
     229            2 :             return Some(cached);
     230            6 :         }
     231            6 :         Some(Cached::new_uncached(value))
     232           10 :     }
     233           14 :     pub fn insert_role_secret(
     234           14 :         &self,
     235           14 :         project_id: ProjectIdInt,
     236           14 :         endpoint_id: EndpointIdInt,
     237           14 :         role_name: RoleNameInt,
     238           14 :         secret: Option<AuthSecret>,
     239           14 :     ) {
     240           14 :         if self.cache.len() >= self.config.size {
     241              :             // If there are too many entries, wait until the next gc cycle.
     242            0 :             return;
     243           14 :         }
     244           14 :         self.insert_project2endpoint(project_id, endpoint_id);
     245           14 :         let mut entry = self.cache.entry(endpoint_id).or_default();
     246           14 :         if entry.secret.len() < self.config.max_roles {
     247           12 :             entry.secret.insert(role_name, secret.into());
     248           12 :         }
     249           14 :     }
     250            6 :     pub fn insert_allowed_ips(
     251            6 :         &self,
     252            6 :         project_id: ProjectIdInt,
     253            6 :         endpoint_id: EndpointIdInt,
     254            6 :         allowed_ips: Arc<Vec<IpPattern>>,
     255            6 :     ) {
     256            6 :         if self.cache.len() >= self.config.size {
     257              :             // If there are too many entries, wait until the next gc cycle.
     258            0 :             return;
     259            6 :         }
     260            6 :         self.insert_project2endpoint(project_id, endpoint_id);
     261            6 :         self.cache.entry(endpoint_id).or_default().allowed_ips = Some(allowed_ips.into());
     262            6 :     }
     263           20 :     fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
     264           20 :         if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
     265           14 :             endpoints.insert(endpoint_id);
     266           14 :         } else {
     267            6 :             self.project2ep
     268            6 :                 .insert(project_id, HashSet::from([endpoint_id]));
     269            6 :         }
     270           20 :     }
     271           40 :     fn get_cache_times(&self) -> (Instant, Option<Instant>) {
     272           40 :         let mut valid_since = Instant::now() - self.config.ttl;
     273           40 :         // Only ignore cache if ttl is disabled.
     274           40 :         let ttl_disabled_since_us = self
     275           40 :             .ttl_disabled_since_us
     276           40 :             .load(std::sync::atomic::Ordering::Relaxed);
     277           40 :         let ignore_cache_since = if ttl_disabled_since_us != u64::MAX {
     278           26 :             let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us);
     279           26 :             // We are fine if entry is not older than ttl or was added before we are getting notifications.
     280           26 :             valid_since = valid_since.min(ignore_cache_since);
     281           26 :             Some(ignore_cache_since)
     282              :         } else {
     283           14 :             None
     284              :         };
     285           40 :         (valid_since, ignore_cache_since)
     286           40 :     }
     287              : 
     288            0 :     pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
     289            0 :         let mut interval =
     290            0 :             tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
     291              :         loop {
     292            0 :             interval.tick().await;
     293            0 :             if self.cache.len() < self.config.size {
     294              :                 // If there are not too many entries, wait until the next gc cycle.
     295            0 :                 continue;
     296            0 :             }
     297            0 :             self.gc();
     298              :         }
     299              :     }
     300              : 
     301            0 :     fn gc(&self) {
     302            0 :         let shard = thread_rng().gen_range(0..self.project2ep.shards().len());
     303            0 :         debug!(shard, "project_info_cache: performing epoch reclamation");
     304              : 
     305              :         // acquire a random shard lock
     306            0 :         let mut removed = 0;
     307            0 :         let shard = self.project2ep.shards()[shard].write();
     308            0 :         for (_, endpoints) in shard.iter() {
     309            0 :             for endpoint in endpoints.get().iter() {
     310            0 :                 self.cache.remove(endpoint);
     311            0 :                 removed += 1;
     312            0 :             }
     313              :         }
     314              :         // We can drop this shard only after making sure that all endpoints are removed.
     315            0 :         drop(shard);
     316            0 :         info!("project_info_cache: removed {removed} endpoints");
     317            0 :     }
     318              : }
     319              : 
     320              : /// Lookup info for project info cache.
     321              : /// This is used to invalidate cache entries.
     322              : pub struct CachedLookupInfo {
     323              :     /// Search by this key.
     324              :     endpoint_id: EndpointIdInt,
     325              :     lookup_type: LookupType,
     326              : }
     327              : 
     328              : impl CachedLookupInfo {
     329            8 :     pub(self) fn new_role_secret(endpoint_id: EndpointIdInt, role_name: RoleNameInt) -> Self {
     330            8 :         Self {
     331            8 :             endpoint_id,
     332            8 :             lookup_type: LookupType::RoleSecret(role_name),
     333            8 :         }
     334            8 :     }
     335            2 :     pub(self) fn new_allowed_ips(endpoint_id: EndpointIdInt) -> Self {
     336            2 :         Self {
     337            2 :             endpoint_id,
     338            2 :             lookup_type: LookupType::AllowedIps,
     339            2 :         }
     340            2 :     }
     341              : }
     342              : 
     343              : enum LookupType {
     344              :     RoleSecret(RoleNameInt),
     345              :     AllowedIps,
     346              : }
     347              : 
     348              : impl Cache for ProjectInfoCacheImpl {
     349              :     type Key = SmolStr;
     350              :     // Value is not really used here, but we need to specify it.
     351              :     type Value = SmolStr;
     352              : 
     353              :     type LookupInfo<Key> = CachedLookupInfo;
     354              : 
     355            0 :     fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
     356            0 :         match &key.lookup_type {
     357            0 :             LookupType::RoleSecret(role_name) => {
     358            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     359            0 :                     endpoint_info.invalidate_role_secret(*role_name);
     360            0 :                 }
     361              :             }
     362              :             LookupType::AllowedIps => {
     363            0 :                 if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
     364            0 :                     endpoint_info.invalidate_allowed_ips();
     365            0 :                 }
     366              :             }
     367              :         }
     368            0 :     }
     369              : }
     370              : 
     371              : #[cfg(test)]
     372              : mod tests {
     373              :     use super::*;
     374              :     use crate::{scram::ServerSecret, ProjectId};
     375              : 
     376              :     #[tokio::test]
     377            2 :     async fn test_project_info_cache_settings() {
     378            2 :         tokio::time::pause();
     379            2 :         let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     380            2 :             size: 2,
     381            2 :             max_roles: 2,
     382            2 :             ttl: Duration::from_secs(1),
     383            2 :             gc_interval: Duration::from_secs(600),
     384            2 :         });
     385            2 :         let project_id: ProjectId = "project".into();
     386            2 :         let endpoint_id: EndpointId = "endpoint".into();
     387            2 :         let user1: RoleName = "user1".into();
     388            2 :         let user2: RoleName = "user2".into();
     389            2 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     390            2 :         let secret2 = None;
     391            2 :         let allowed_ips = Arc::new(vec![
     392            2 :             "127.0.0.1".parse().unwrap(),
     393            2 :             "127.0.0.2".parse().unwrap(),
     394            2 :         ]);
     395            2 :         cache.insert_role_secret(
     396            2 :             (&project_id).into(),
     397            2 :             (&endpoint_id).into(),
     398            2 :             (&user1).into(),
     399            2 :             secret1.clone(),
     400            2 :         );
     401            2 :         cache.insert_role_secret(
     402            2 :             (&project_id).into(),
     403            2 :             (&endpoint_id).into(),
     404            2 :             (&user2).into(),
     405            2 :             secret2.clone(),
     406            2 :         );
     407            2 :         cache.insert_allowed_ips(
     408            2 :             (&project_id).into(),
     409            2 :             (&endpoint_id).into(),
     410            2 :             allowed_ips.clone(),
     411            2 :         );
     412            2 : 
     413            2 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     414            2 :         assert!(cached.cached());
     415            2 :         assert_eq!(cached.value, secret1);
     416            2 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     417            2 :         assert!(cached.cached());
     418            2 :         assert_eq!(cached.value, secret2);
     419            2 : 
     420            2 :         // Shouldn't add more than 2 roles.
     421            2 :         let user3: RoleName = "user3".into();
     422            2 :         let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32])));
     423            2 :         cache.insert_role_secret(
     424            2 :             (&project_id).into(),
     425            2 :             (&endpoint_id).into(),
     426            2 :             (&user3).into(),
     427            2 :             secret3.clone(),
     428            2 :         );
     429            2 :         assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
     430            2 : 
     431            2 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     432            2 :         assert!(cached.cached());
     433            2 :         assert_eq!(cached.value, allowed_ips);
     434            2 : 
     435            2 :         tokio::time::advance(Duration::from_secs(2)).await;
     436            2 :         let cached = cache.get_role_secret(&endpoint_id, &user1);
     437            2 :         assert!(cached.is_none());
     438            2 :         let cached = cache.get_role_secret(&endpoint_id, &user2);
     439            2 :         assert!(cached.is_none());
     440            2 :         let cached = cache.get_allowed_ips(&endpoint_id);
     441            2 :         assert!(cached.is_none());
     442            2 :     }
     443              : 
     444              :     #[tokio::test]
     445            2 :     async fn test_project_info_cache_invalidations() {
     446            2 :         tokio::time::pause();
     447            2 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     448            2 :             size: 2,
     449            2 :             max_roles: 2,
     450            2 :             ttl: Duration::from_secs(1),
     451            2 :             gc_interval: Duration::from_secs(600),
     452            2 :         }));
     453            2 :         cache.clone().increment_active_listeners().await;
     454            2 :         tokio::time::advance(Duration::from_secs(2)).await;
     455            2 : 
     456            2 :         let project_id: ProjectId = "project".into();
     457            2 :         let endpoint_id: EndpointId = "endpoint".into();
     458            2 :         let user1: RoleName = "user1".into();
     459            2 :         let user2: RoleName = "user2".into();
     460            2 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     461            2 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     462            2 :         let allowed_ips = Arc::new(vec![
     463            2 :             "127.0.0.1".parse().unwrap(),
     464            2 :             "127.0.0.2".parse().unwrap(),
     465            2 :         ]);
     466            2 :         cache.insert_role_secret(
     467            2 :             (&project_id).into(),
     468            2 :             (&endpoint_id).into(),
     469            2 :             (&user1).into(),
     470            2 :             secret1.clone(),
     471            2 :         );
     472            2 :         cache.insert_role_secret(
     473            2 :             (&project_id).into(),
     474            2 :             (&endpoint_id).into(),
     475            2 :             (&user2).into(),
     476            2 :             secret2.clone(),
     477            2 :         );
     478            2 :         cache.insert_allowed_ips(
     479            2 :             (&project_id).into(),
     480            2 :             (&endpoint_id).into(),
     481            2 :             allowed_ips.clone(),
     482            2 :         );
     483            2 : 
     484            2 :         tokio::time::advance(Duration::from_secs(2)).await;
     485            2 :         // Nothing should be invalidated.
     486            2 : 
     487            2 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     488            2 :         // TTL is disabled, so it should be impossible to invalidate this value.
     489            2 :         assert!(!cached.cached());
     490            2 :         assert_eq!(cached.value, secret1);
     491            2 : 
     492            2 :         cached.invalidate(); // Shouldn't do anything.
     493            2 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     494            2 :         assert_eq!(cached.value, secret1);
     495            2 : 
     496            2 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     497            2 :         assert!(!cached.cached());
     498            2 :         assert_eq!(cached.value, secret2);
     499            2 : 
     500            2 :         // The only way to invalidate this value is to invalidate via the api.
     501            2 :         cache.invalidate_role_secret_for_project((&project_id).into(), (&user2).into());
     502            2 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     503            2 : 
     504            2 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     505            2 :         assert!(!cached.cached());
     506            2 :         assert_eq!(cached.value, allowed_ips);
     507            2 :     }
     508              : 
     509              :     #[tokio::test]
     510            2 :     async fn test_increment_active_listeners_invalidate_added_before() {
     511            2 :         tokio::time::pause();
     512            2 :         let cache = Arc::new(ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
     513            2 :             size: 2,
     514            2 :             max_roles: 2,
     515            2 :             ttl: Duration::from_secs(1),
     516            2 :             gc_interval: Duration::from_secs(600),
     517            2 :         }));
     518            2 : 
     519            2 :         let project_id: ProjectId = "project".into();
     520            2 :         let endpoint_id: EndpointId = "endpoint".into();
     521            2 :         let user1: RoleName = "user1".into();
     522            2 :         let user2: RoleName = "user2".into();
     523            2 :         let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
     524            2 :         let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
     525            2 :         let allowed_ips = Arc::new(vec![
     526            2 :             "127.0.0.1".parse().unwrap(),
     527            2 :             "127.0.0.2".parse().unwrap(),
     528            2 :         ]);
     529            2 :         cache.insert_role_secret(
     530            2 :             (&project_id).into(),
     531            2 :             (&endpoint_id).into(),
     532            2 :             (&user1).into(),
     533            2 :             secret1.clone(),
     534            2 :         );
     535            2 :         cache.clone().increment_active_listeners().await;
     536            2 :         tokio::time::advance(Duration::from_millis(100)).await;
     537            2 :         cache.insert_role_secret(
     538            2 :             (&project_id).into(),
     539            2 :             (&endpoint_id).into(),
     540            2 :             (&user2).into(),
     541            2 :             secret2.clone(),
     542            2 :         );
     543            2 : 
     544            2 :         // Added before ttl was disabled + ttl should be still cached.
     545            2 :         let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
     546            2 :         assert!(cached.cached());
     547            2 :         let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
     548            2 :         assert!(cached.cached());
     549            2 : 
     550            2 :         tokio::time::advance(Duration::from_secs(1)).await;
     551            2 :         // Added before ttl was disabled + ttl should expire.
     552            2 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     553            2 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     554            2 : 
     555            2 :         // Added after ttl was disabled + ttl should not be cached.
     556            2 :         cache.insert_allowed_ips(
     557            2 :             (&project_id).into(),
     558            2 :             (&endpoint_id).into(),
     559            2 :             allowed_ips.clone(),
     560            2 :         );
     561            2 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     562            2 :         assert!(!cached.cached());
     563            2 : 
     564            2 :         tokio::time::advance(Duration::from_secs(1)).await;
     565            2 :         // Added before ttl was disabled + ttl still should expire.
     566            2 :         assert!(cache.get_role_secret(&endpoint_id, &user1).is_none());
     567            2 :         assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
     568            2 :         // Shouldn't be invalidated.
     569            2 : 
     570            2 :         let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
     571            2 :         assert!(!cached.cached());
     572            2 :         assert_eq!(cached.value, allowed_ips);
     573            2 :     }
     574              : }
        

Generated by: LCOV version 2.1-beta