Line data Source code
1 : use std::fmt::Display;
2 : use std::time::Instant;
3 : use std::{collections::HashMap, sync::Arc};
4 :
5 : use std::time::Duration;
6 :
7 : use crate::service::RECONCILE_TIMEOUT;
8 :
9 : const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;
10 :
11 : /// A wrapper around `OwnedRwLockWriteGuard` that when dropped changes the
12 : /// current holding operation in lock.
13 : pub struct WrappedWriteGuard<T: Display> {
14 : guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
15 : start: Instant,
16 : }
17 :
18 : impl<T: Display> WrappedWriteGuard<T> {
19 2 : pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
20 2 : Self {
21 2 : guard,
22 2 : start: Instant::now(),
23 2 : }
24 2 : }
25 : }
26 :
27 : impl<T: Display> Drop for WrappedWriteGuard<T> {
28 2 : fn drop(&mut self) {
29 2 : let duration = self.start.elapsed();
30 2 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
31 0 : tracing::warn!(
32 0 : "Lock on {} was held for {:?}",
33 0 : self.guard.as_ref().unwrap(),
34 : duration
35 : );
36 2 : }
37 2 : *self.guard = None;
38 2 : }
39 : }
40 :
41 : /// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
42 : /// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
43 : /// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
44 : /// is needed at a tenant-wide granularity.
45 : pub(crate) struct IdLockMap<T, I>
46 : where
47 : T: Eq + PartialEq + std::hash::Hash,
48 : {
49 : /// A synchronous lock for getting/setting the async locks that our callers will wait on.
50 : entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<Option<I>>>>>,
51 : }
52 :
53 : impl<T, I> IdLockMap<T, I>
54 : where
55 : T: Eq + PartialEq + std::hash::Hash,
56 : I: Display,
57 : {
58 6 : pub(crate) fn shared(
59 6 : &self,
60 6 : key: T,
61 6 : ) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<Option<I>>> {
62 6 : let mut locked = self.entities.lock().unwrap();
63 6 : let entry = locked.entry(key).or_default();
64 6 : entry.clone().read_owned()
65 6 : }
66 :
67 4 : pub(crate) fn exclusive(
68 4 : &self,
69 4 : key: T,
70 4 : operation: I,
71 4 : ) -> impl std::future::Future<Output = WrappedWriteGuard<I>> {
72 4 : let mut locked = self.entities.lock().unwrap();
73 4 : let entry = locked.entry(key).or_default().clone();
74 4 : async move {
75 4 : let mut guard = WrappedWriteGuard::new(entry.clone().write_owned().await);
76 2 : *guard.guard = Some(operation);
77 2 : guard
78 2 : }
79 4 : }
80 :
81 : /// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
82 : /// periodic housekeeping to avoid the map growing indefinitely
83 0 : pub(crate) fn housekeeping(&self) {
84 0 : let mut locked = self.entities.lock().unwrap();
85 0 : locked.retain(|_k, entry| entry.try_write().is_err())
86 0 : }
87 : }
88 :
89 : impl<T, I> Default for IdLockMap<T, I>
90 : where
91 : T: Eq + PartialEq + std::hash::Hash,
92 : {
93 4 : fn default() -> Self {
94 4 : Self {
95 4 : entities: std::sync::Mutex::new(HashMap::new()),
96 4 : }
97 4 : }
98 : }
99 :
100 0 : pub async fn trace_exclusive_lock<
101 0 : T: Clone + Display + Eq + PartialEq + std::hash::Hash,
102 0 : I: Display + Clone,
103 0 : >(
104 0 : op_locks: &IdLockMap<T, I>,
105 0 : key: T,
106 0 : operation: I,
107 0 : ) -> WrappedWriteGuard<I> {
108 0 : let start = Instant::now();
109 0 : let guard = op_locks.exclusive(key.clone(), operation.clone()).await;
110 :
111 0 : let duration = start.elapsed();
112 0 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
113 0 : tracing::warn!(
114 0 : "Operation {} on key {} has waited {:?} for exclusive lock",
115 : operation,
116 : key,
117 : duration
118 : );
119 0 : }
120 :
121 0 : guard
122 0 : }
123 :
124 0 : pub async fn trace_shared_lock<
125 0 : T: Clone + Display + Eq + PartialEq + std::hash::Hash,
126 0 : I: Display,
127 0 : >(
128 0 : op_locks: &IdLockMap<T, I>,
129 0 : key: T,
130 0 : operation: I,
131 0 : ) -> tokio::sync::OwnedRwLockReadGuard<Option<I>> {
132 0 : let start = Instant::now();
133 0 : let guard = op_locks.shared(key.clone()).await;
134 :
135 0 : let duration = start.elapsed();
136 0 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
137 0 : tracing::warn!(
138 0 : "Operation {} on key {} has waited {:?} for shared lock",
139 : operation,
140 : key,
141 : duration
142 : );
143 0 : }
144 :
145 0 : guard
146 0 : }
147 :
148 : #[cfg(test)]
149 : mod tests {
150 : use super::IdLockMap;
151 :
152 0 : #[derive(Clone, Debug, strum_macros::Display, PartialEq)]
153 : enum Operations {
154 : Op1,
155 : Op2,
156 : }
157 :
158 : #[tokio::test]
159 2 : async fn multiple_shared_locks() {
160 2 : let id_lock_map: IdLockMap<i32, Operations> = IdLockMap::default();
161 2 :
162 2 : let shared_lock_1 = id_lock_map.shared(1).await;
163 2 : let shared_lock_2 = id_lock_map.shared(1).await;
164 2 :
165 2 : assert!(shared_lock_1.is_none());
166 2 : assert!(shared_lock_2.is_none());
167 2 : }
168 :
169 : #[tokio::test]
170 2 : async fn exclusive_locks() {
171 2 : let id_lock_map = IdLockMap::default();
172 2 : let resource_id = 1;
173 2 :
174 2 : {
175 2 : let _ex_lock = id_lock_map.exclusive(resource_id, Operations::Op1).await;
176 2 : assert_eq!(_ex_lock.guard.clone().unwrap(), Operations::Op1);
177 2 :
178 2 : let _ex_lock_2 = tokio::time::timeout(
179 2 : tokio::time::Duration::from_millis(1),
180 2 : id_lock_map.exclusive(resource_id, Operations::Op2),
181 2 : )
182 2 : .await;
183 2 : assert!(_ex_lock_2.is_err());
184 2 : }
185 2 :
186 2 : let shared_lock_1 = id_lock_map.shared(resource_id).await;
187 2 : assert!(shared_lock_1.is_none());
188 2 : }
189 : }
|