LCOV - code coverage report
Current view: top level - storage_controller/src - id_lock_map.rs (source / functions) Coverage Total Hit
Test: c789ec21f6053d4c25d2419c4a34ed298d5f69f5.info Lines: 61.1 % 108 66
Test Date: 2024-06-20 08:12:09 Functions: 30.3 % 33 10

            Line data    Source code
       1              : use std::fmt::Display;
       2              : use std::time::Instant;
       3              : use std::{collections::HashMap, sync::Arc};
       4              : 
       5              : use std::time::Duration;
       6              : 
       7              : use crate::service::RECONCILE_TIMEOUT;
       8              : 
       9              : const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;
      10              : 
      11              : /// A wrapper around `OwnedRwLockWriteGuard` that when dropped changes the
      12              : /// current holding operation in lock.
      13              : pub struct WrappedWriteGuard<T: Display> {
      14              :     guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
      15              :     start: Instant,
      16              : }
      17              : 
      18              : impl<T: Display> WrappedWriteGuard<T> {
      19            2 :     pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
      20            2 :         Self {
      21            2 :             guard,
      22            2 :             start: Instant::now(),
      23            2 :         }
      24            2 :     }
      25              : }
      26              : 
      27              : impl<T: Display> Drop for WrappedWriteGuard<T> {
      28            2 :     fn drop(&mut self) {
      29            2 :         let duration = self.start.elapsed();
      30            2 :         if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
      31            0 :             tracing::warn!(
      32            0 :                 "Lock on {} was held for {:?}",
      33            0 :                 self.guard.as_ref().unwrap(),
      34              :                 duration
      35              :             );
      36            2 :         }
      37            2 :         *self.guard = None;
      38            2 :     }
      39              : }
      40              : 
      41              : /// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
      42              : /// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
      43              : /// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
      44              : /// is needed at a tenant-wide granularity.
      45              : pub(crate) struct IdLockMap<T, I>
      46              : where
      47              :     T: Eq + PartialEq + std::hash::Hash,
      48              : {
      49              :     /// A synchronous lock for getting/setting the async locks that our callers will wait on.
      50              :     entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<Option<I>>>>>,
      51              : }
      52              : 
      53              : impl<T, I> IdLockMap<T, I>
      54              : where
      55              :     T: Eq + PartialEq + std::hash::Hash,
      56              :     I: Display,
      57              : {
      58            6 :     pub(crate) fn shared(
      59            6 :         &self,
      60            6 :         key: T,
      61            6 :     ) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<Option<I>>> {
      62            6 :         let mut locked = self.entities.lock().unwrap();
      63            6 :         let entry = locked.entry(key).or_default();
      64            6 :         entry.clone().read_owned()
      65            6 :     }
      66              : 
      67            4 :     pub(crate) fn exclusive(
      68            4 :         &self,
      69            4 :         key: T,
      70            4 :         operation: I,
      71            4 :     ) -> impl std::future::Future<Output = WrappedWriteGuard<I>> {
      72            4 :         let mut locked = self.entities.lock().unwrap();
      73            4 :         let entry = locked.entry(key).or_default().clone();
      74            4 :         async move {
      75            4 :             let mut guard = WrappedWriteGuard::new(entry.clone().write_owned().await);
      76            2 :             *guard.guard = Some(operation);
      77            2 :             guard
      78            2 :         }
      79            4 :     }
      80              : 
      81              :     /// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
      82              :     /// periodic housekeeping to avoid the map growing indefinitely
      83            0 :     pub(crate) fn housekeeping(&self) {
      84            0 :         let mut locked = self.entities.lock().unwrap();
      85            0 :         locked.retain(|_k, entry| entry.try_write().is_err())
      86            0 :     }
      87              : }
      88              : 
      89              : impl<T, I> Default for IdLockMap<T, I>
      90              : where
      91              :     T: Eq + PartialEq + std::hash::Hash,
      92              : {
      93            4 :     fn default() -> Self {
      94            4 :         Self {
      95            4 :             entities: std::sync::Mutex::new(HashMap::new()),
      96            4 :         }
      97            4 :     }
      98              : }
      99              : 
     100            0 : pub async fn trace_exclusive_lock<
     101            0 :     T: Clone + Display + Eq + PartialEq + std::hash::Hash,
     102            0 :     I: Display + Clone,
     103            0 : >(
     104            0 :     op_locks: &IdLockMap<T, I>,
     105            0 :     key: T,
     106            0 :     operation: I,
     107            0 : ) -> WrappedWriteGuard<I> {
     108            0 :     let start = Instant::now();
     109            0 :     let guard = op_locks.exclusive(key.clone(), operation.clone()).await;
     110              : 
     111            0 :     let duration = start.elapsed();
     112            0 :     if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
     113            0 :         tracing::warn!(
     114            0 :             "Operation {} on key {} has waited {:?} for exclusive lock",
     115              :             operation,
     116              :             key,
     117              :             duration
     118              :         );
     119            0 :     }
     120              : 
     121            0 :     guard
     122            0 : }
     123              : 
     124            0 : pub async fn trace_shared_lock<
     125            0 :     T: Clone + Display + Eq + PartialEq + std::hash::Hash,
     126            0 :     I: Display,
     127            0 : >(
     128            0 :     op_locks: &IdLockMap<T, I>,
     129            0 :     key: T,
     130            0 :     operation: I,
     131            0 : ) -> tokio::sync::OwnedRwLockReadGuard<Option<I>> {
     132            0 :     let start = Instant::now();
     133            0 :     let guard = op_locks.shared(key.clone()).await;
     134              : 
     135            0 :     let duration = start.elapsed();
     136            0 :     if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
     137            0 :         tracing::warn!(
     138            0 :             "Operation {} on key {} has waited {:?} for shared lock",
     139              :             operation,
     140              :             key,
     141              :             duration
     142              :         );
     143            0 :     }
     144              : 
     145            0 :     guard
     146            0 : }
     147              : 
     148              : #[cfg(test)]
     149              : mod tests {
     150              :     use super::IdLockMap;
     151              : 
     152            0 :     #[derive(Clone, Debug, strum_macros::Display, PartialEq)]
     153              :     enum Operations {
     154              :         Op1,
     155              :         Op2,
     156              :     }
     157              : 
     158              :     #[tokio::test]
     159            2 :     async fn multiple_shared_locks() {
     160            2 :         let id_lock_map: IdLockMap<i32, Operations> = IdLockMap::default();
     161            2 : 
     162            2 :         let shared_lock_1 = id_lock_map.shared(1).await;
     163            2 :         let shared_lock_2 = id_lock_map.shared(1).await;
     164            2 : 
     165            2 :         assert!(shared_lock_1.is_none());
     166            2 :         assert!(shared_lock_2.is_none());
     167            2 :     }
     168              : 
     169              :     #[tokio::test]
     170            2 :     async fn exclusive_locks() {
     171            2 :         let id_lock_map = IdLockMap::default();
     172            2 :         let resource_id = 1;
     173            2 : 
     174            2 :         {
     175            2 :             let _ex_lock = id_lock_map.exclusive(resource_id, Operations::Op1).await;
     176            2 :             assert_eq!(_ex_lock.guard.clone().unwrap(), Operations::Op1);
     177            2 : 
     178            2 :             let _ex_lock_2 = tokio::time::timeout(
     179            2 :                 tokio::time::Duration::from_millis(1),
     180            2 :                 id_lock_map.exclusive(resource_id, Operations::Op2),
     181            2 :             )
     182            2 :             .await;
     183            2 :             assert!(_ex_lock_2.is_err());
     184            2 :         }
     185            2 : 
     186            2 :         let shared_lock_1 = id_lock_map.shared(resource_id).await;
     187            2 :         assert!(shared_lock_1.is_none());
     188            2 :     }
     189              : }
        

Generated by: LCOV version 2.1-beta