LCOV - code coverage report
Current view: top level - storage_controller/src - id_lock_map.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 61.2 % 129 79
Test Date: 2025-03-12 18:28:53 Functions: 33.3 % 39 13

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt::Display;
       3              : use std::sync::Arc;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : use crate::service::RECONCILE_TIMEOUT;
       7              : 
       8              : const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;
       9              : 
      10              : /// A wrapper around `OwnedRwLockWriteGuard` used for tracking the
      11              : /// operation that holds the lock, and print a warning if it exceeds
      12              : /// the LOCK_TIMEOUT_ALERT_THRESHOLD time
      13              : pub struct TracingExclusiveGuard<T: Display> {
      14              :     guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
      15              :     start: Instant,
      16              : }
      17              : 
      18              : impl<T: Display> TracingExclusiveGuard<T> {
      19            1 :     pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
      20            1 :         Self {
      21            1 :             guard,
      22            1 :             start: Instant::now(),
      23            1 :         }
      24            1 :     }
      25              : }
      26              : 
      27              : impl<T: Display> Drop for TracingExclusiveGuard<T> {
      28            1 :     fn drop(&mut self) {
      29            1 :         let duration = self.start.elapsed();
      30            1 :         if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
      31            0 :             tracing::warn!(
      32            0 :                 "Exclusive lock by {} was held for {:?}",
      33            0 :                 self.guard.as_ref().unwrap(),
      34              :                 duration
      35              :             );
      36            1 :         }
      37            1 :         *self.guard = None;
      38            1 :     }
      39              : }
      40              : 
      41              : // A wrapper around `OwnedRwLockReadGuard` used for tracking the
      42              : /// operation that holds the lock, and print a warning if it exceeds
      43              : /// the LOCK_TIMEOUT_ALERT_THRESHOLD time
      44              : pub struct TracingSharedGuard<T: Display> {
      45              :     _guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>,
      46              :     operation: T,
      47              :     start: Instant,
      48              : }
      49              : 
      50              : impl<T: Display> TracingSharedGuard<T> {
      51            3 :     pub fn new(guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>, operation: T) -> Self {
      52            3 :         Self {
      53            3 :             _guard: guard,
      54            3 :             operation,
      55            3 :             start: Instant::now(),
      56            3 :         }
      57            3 :     }
      58              : }
      59              : 
      60              : impl<T: Display> Drop for TracingSharedGuard<T> {
      61            3 :     fn drop(&mut self) {
      62            3 :         let duration = self.start.elapsed();
      63            3 :         if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
      64            0 :             tracing::warn!(
      65            0 :                 "Shared lock by {} was held for {:?}",
      66              :                 self.operation,
      67              :                 duration
      68              :             );
      69            3 :         }
      70            3 :     }
      71              : }
      72              : 
      73              : /// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
      74              : /// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
      75              : /// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
      76              : /// is needed at a tenant-wide granularity.
      77              : pub(crate) struct IdLockMap<T, I>
      78              : where
      79              :     T: Eq + PartialEq + std::hash::Hash,
      80              : {
      81              :     /// A synchronous lock for getting/setting the async locks that our callers will wait on.
      82              :     entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<Option<I>>>>>,
      83              : }
      84              : 
      85              : impl<T, I> IdLockMap<T, I>
      86              : where
      87              :     T: Eq + PartialEq + std::hash::Hash,
      88              :     I: Display,
      89              : {
      90            3 :     pub(crate) fn shared(
      91            3 :         &self,
      92            3 :         key: T,
      93            3 :         operation: I,
      94            3 :     ) -> impl std::future::Future<Output = TracingSharedGuard<I>> {
      95            3 :         let mut locked = self.entities.lock().unwrap();
      96            3 :         let entry = locked.entry(key).or_default().clone();
      97            3 :         async move { TracingSharedGuard::new(entry.read_owned().await, operation) }
      98            3 :     }
      99              : 
     100            2 :     pub(crate) fn exclusive(
     101            2 :         &self,
     102            2 :         key: T,
     103            2 :         operation: I,
     104            2 :     ) -> impl std::future::Future<Output = TracingExclusiveGuard<I>> {
     105            2 :         let mut locked = self.entities.lock().unwrap();
     106            2 :         let entry = locked.entry(key).or_default().clone();
     107            2 :         async move {
     108            2 :             let mut guard = TracingExclusiveGuard::new(entry.write_owned().await);
     109            1 :             *guard.guard = Some(operation);
     110            1 :             guard
     111            1 :         }
     112            2 :     }
     113              : 
     114            0 :     pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
     115            0 :         let mut locked = self.entities.lock().unwrap();
     116            0 :         let entry = locked.entry(key).or_default().clone();
     117            0 :         let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
     118            0 :         *guard.guard = Some(operation);
     119            0 :         Some(guard)
     120            0 :     }
     121              : 
     122              :     /// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
     123              :     /// periodic housekeeping to avoid the map growing indefinitely
     124            0 :     pub(crate) fn housekeeping(&self) {
     125            0 :         let mut locked = self.entities.lock().unwrap();
     126            0 :         locked.retain(|_k, entry| entry.try_write().is_err())
     127            0 :     }
     128              : }
     129              : 
     130              : impl<T, I> Default for IdLockMap<T, I>
     131              : where
     132              :     T: Eq + PartialEq + std::hash::Hash,
     133              : {
     134            2 :     fn default() -> Self {
     135            2 :         Self {
     136            2 :             entities: std::sync::Mutex::new(HashMap::new()),
     137            2 :         }
     138            2 :     }
     139              : }
     140              : 
     141            0 : pub async fn trace_exclusive_lock<
     142            0 :     T: Clone + Display + Eq + PartialEq + std::hash::Hash,
     143            0 :     I: Clone + Display,
     144            0 : >(
     145            0 :     op_locks: &IdLockMap<T, I>,
     146            0 :     key: T,
     147            0 :     operation: I,
     148            0 : ) -> TracingExclusiveGuard<I> {
     149            0 :     let start = Instant::now();
     150            0 :     let guard = op_locks.exclusive(key.clone(), operation.clone()).await;
     151              : 
     152            0 :     let duration = start.elapsed();
     153            0 :     if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
     154            0 :         tracing::warn!(
     155            0 :             "Operation {} on key {} has waited {:?} for exclusive lock",
     156              :             operation,
     157              :             key,
     158              :             duration
     159              :         );
     160            0 :     }
     161              : 
     162            0 :     guard
     163            0 : }
     164              : 
     165            0 : pub async fn trace_shared_lock<
     166            0 :     T: Clone + Display + Eq + PartialEq + std::hash::Hash,
     167            0 :     I: Clone + Display,
     168            0 : >(
     169            0 :     op_locks: &IdLockMap<T, I>,
     170            0 :     key: T,
     171            0 :     operation: I,
     172            0 : ) -> TracingSharedGuard<I> {
     173            0 :     let start = Instant::now();
     174            0 :     let guard = op_locks.shared(key.clone(), operation.clone()).await;
     175              : 
     176            0 :     let duration = start.elapsed();
     177            0 :     if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
     178            0 :         tracing::warn!(
     179            0 :             "Operation {} on key {} has waited {:?} for shared lock",
     180              :             operation,
     181              :             key,
     182              :             duration
     183              :         );
     184            0 :     }
     185              : 
     186            0 :     guard
     187            0 : }
     188              : 
     189              : #[cfg(test)]
     190              : mod tests {
     191              :     use super::IdLockMap;
     192              : 
     193              :     #[derive(Clone, Debug, strum_macros::Display, PartialEq)]
     194              :     enum Operations {
     195              :         Op1,
     196              :         Op2,
     197              :     }
     198              : 
     199              :     #[tokio::test]
     200            1 :     async fn multiple_shared_locks() {
     201            1 :         let id_lock_map: IdLockMap<i32, Operations> = IdLockMap::default();
     202            1 : 
     203            1 :         let shared_lock_1 = id_lock_map.shared(1, Operations::Op1).await;
     204            1 :         let shared_lock_2 = id_lock_map.shared(1, Operations::Op2).await;
     205            1 : 
     206            1 :         assert_eq!(shared_lock_1.operation, Operations::Op1);
     207            1 :         assert_eq!(shared_lock_2.operation, Operations::Op2);
     208            1 :     }
     209              : 
     210              :     #[tokio::test]
     211            1 :     async fn exclusive_locks() {
     212            1 :         let id_lock_map = IdLockMap::default();
     213            1 :         let resource_id = 1;
     214            1 : 
     215            1 :         {
     216            1 :             let _ex_lock = id_lock_map.exclusive(resource_id, Operations::Op1).await;
     217            1 :             assert_eq!(_ex_lock.guard.clone().unwrap(), Operations::Op1);
     218            1 : 
     219            1 :             let _ex_lock_2 = tokio::time::timeout(
     220            1 :                 tokio::time::Duration::from_millis(1),
     221            1 :                 id_lock_map.exclusive(resource_id, Operations::Op2),
     222            1 :             )
     223            1 :             .await;
     224            1 :             assert!(_ex_lock_2.is_err());
     225            1 :         }
     226            1 : 
     227            1 :         let shared_lock_1 = id_lock_map.shared(resource_id, Operations::Op1).await;
     228            1 :         assert_eq!(shared_lock_1.operation, Operations::Op1);
     229            1 :     }
     230              : }
        

Generated by: LCOV version 2.1-beta