Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Display;
3 : use std::sync::Arc;
4 : use std::time::{Duration, Instant};
5 :
6 : use crate::service::RECONCILE_TIMEOUT;
7 :
8 : const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;
9 :
10 : /// A wrapper around `OwnedRwLockWriteGuard` used for tracking the
11 : /// operation that holds the lock, and print a warning if it exceeds
12 : /// the LOCK_TIMEOUT_ALERT_THRESHOLD time
13 : pub struct TracingExclusiveGuard<T: Display> {
14 : guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
15 : start: Instant,
16 : }
17 :
18 : impl<T: Display> TracingExclusiveGuard<T> {
19 1 : pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
20 1 : Self {
21 1 : guard,
22 1 : start: Instant::now(),
23 1 : }
24 1 : }
25 : }
26 :
27 : impl<T: Display> Drop for TracingExclusiveGuard<T> {
28 1 : fn drop(&mut self) {
29 1 : let duration = self.start.elapsed();
30 1 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
31 0 : tracing::warn!(
32 0 : "Exclusive lock by {} was held for {:?}",
33 0 : self.guard.as_ref().unwrap(),
34 : duration
35 : );
36 1 : }
37 1 : *self.guard = None;
38 1 : }
39 : }
40 :
41 : // A wrapper around `OwnedRwLockReadGuard` used for tracking the
42 : /// operation that holds the lock, and print a warning if it exceeds
43 : /// the LOCK_TIMEOUT_ALERT_THRESHOLD time
44 : pub struct TracingSharedGuard<T: Display> {
45 : _guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>,
46 : operation: T,
47 : start: Instant,
48 : }
49 :
50 : impl<T: Display> TracingSharedGuard<T> {
51 3 : pub fn new(guard: tokio::sync::OwnedRwLockReadGuard<Option<T>>, operation: T) -> Self {
52 3 : Self {
53 3 : _guard: guard,
54 3 : operation,
55 3 : start: Instant::now(),
56 3 : }
57 3 : }
58 : }
59 :
60 : impl<T: Display> Drop for TracingSharedGuard<T> {
61 3 : fn drop(&mut self) {
62 3 : let duration = self.start.elapsed();
63 3 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
64 0 : tracing::warn!(
65 0 : "Shared lock by {} was held for {:?}",
66 : self.operation,
67 : duration
68 : );
69 3 : }
70 3 : }
71 : }
72 :
73 : /// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
74 : /// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
75 : /// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
76 : /// is needed at a tenant-wide granularity.
77 : pub(crate) struct IdLockMap<T, I>
78 : where
79 : T: Eq + PartialEq + std::hash::Hash,
80 : {
81 : /// A synchronous lock for getting/setting the async locks that our callers will wait on.
82 : entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<Option<I>>>>>,
83 : }
84 :
85 : impl<T, I> IdLockMap<T, I>
86 : where
87 : T: Eq + PartialEq + std::hash::Hash,
88 : I: Display,
89 : {
90 3 : pub(crate) fn shared(
91 3 : &self,
92 3 : key: T,
93 3 : operation: I,
94 3 : ) -> impl std::future::Future<Output = TracingSharedGuard<I>> {
95 3 : let mut locked = self.entities.lock().unwrap();
96 3 : let entry = locked.entry(key).or_default().clone();
97 3 : async move { TracingSharedGuard::new(entry.read_owned().await, operation) }
98 3 : }
99 :
100 2 : pub(crate) fn exclusive(
101 2 : &self,
102 2 : key: T,
103 2 : operation: I,
104 2 : ) -> impl std::future::Future<Output = TracingExclusiveGuard<I>> {
105 2 : let mut locked = self.entities.lock().unwrap();
106 2 : let entry = locked.entry(key).or_default().clone();
107 2 : async move {
108 2 : let mut guard = TracingExclusiveGuard::new(entry.write_owned().await);
109 1 : *guard.guard = Some(operation);
110 1 : guard
111 1 : }
112 2 : }
113 :
114 0 : pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
115 0 : let mut locked = self.entities.lock().unwrap();
116 0 : let entry = locked.entry(key).or_default().clone();
117 0 : let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
118 0 : *guard.guard = Some(operation);
119 0 : Some(guard)
120 0 : }
121 :
122 : /// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
123 : /// periodic housekeeping to avoid the map growing indefinitely
124 0 : pub(crate) fn housekeeping(&self) {
125 0 : let mut locked = self.entities.lock().unwrap();
126 0 : locked.retain(|_k, entry| entry.try_write().is_err())
127 0 : }
128 : }
129 :
130 : impl<T, I> Default for IdLockMap<T, I>
131 : where
132 : T: Eq + PartialEq + std::hash::Hash,
133 : {
134 2 : fn default() -> Self {
135 2 : Self {
136 2 : entities: std::sync::Mutex::new(HashMap::new()),
137 2 : }
138 2 : }
139 : }
140 :
141 0 : pub async fn trace_exclusive_lock<
142 0 : T: Clone + Display + Eq + PartialEq + std::hash::Hash,
143 0 : I: Clone + Display,
144 0 : >(
145 0 : op_locks: &IdLockMap<T, I>,
146 0 : key: T,
147 0 : operation: I,
148 0 : ) -> TracingExclusiveGuard<I> {
149 0 : let start = Instant::now();
150 0 : let guard = op_locks.exclusive(key.clone(), operation.clone()).await;
151 :
152 0 : let duration = start.elapsed();
153 0 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
154 0 : tracing::warn!(
155 0 : "Operation {} on key {} has waited {:?} for exclusive lock",
156 : operation,
157 : key,
158 : duration
159 : );
160 0 : }
161 :
162 0 : guard
163 0 : }
164 :
165 0 : pub async fn trace_shared_lock<
166 0 : T: Clone + Display + Eq + PartialEq + std::hash::Hash,
167 0 : I: Clone + Display,
168 0 : >(
169 0 : op_locks: &IdLockMap<T, I>,
170 0 : key: T,
171 0 : operation: I,
172 0 : ) -> TracingSharedGuard<I> {
173 0 : let start = Instant::now();
174 0 : let guard = op_locks.shared(key.clone(), operation.clone()).await;
175 :
176 0 : let duration = start.elapsed();
177 0 : if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
178 0 : tracing::warn!(
179 0 : "Operation {} on key {} has waited {:?} for shared lock",
180 : operation,
181 : key,
182 : duration
183 : );
184 0 : }
185 :
186 0 : guard
187 0 : }
188 :
189 : #[cfg(test)]
190 : mod tests {
191 : use super::IdLockMap;
192 :
193 : #[derive(Clone, Debug, strum_macros::Display, PartialEq)]
194 : enum Operations {
195 : Op1,
196 : Op2,
197 : }
198 :
199 : #[tokio::test]
200 1 : async fn multiple_shared_locks() {
201 1 : let id_lock_map: IdLockMap<i32, Operations> = IdLockMap::default();
202 1 :
203 1 : let shared_lock_1 = id_lock_map.shared(1, Operations::Op1).await;
204 1 : let shared_lock_2 = id_lock_map.shared(1, Operations::Op2).await;
205 1 :
206 1 : assert_eq!(shared_lock_1.operation, Operations::Op1);
207 1 : assert_eq!(shared_lock_2.operation, Operations::Op2);
208 1 : }
209 :
210 : #[tokio::test]
211 1 : async fn exclusive_locks() {
212 1 : let id_lock_map = IdLockMap::default();
213 1 : let resource_id = 1;
214 1 :
215 1 : {
216 1 : let _ex_lock = id_lock_map.exclusive(resource_id, Operations::Op1).await;
217 1 : assert_eq!(_ex_lock.guard.clone().unwrap(), Operations::Op1);
218 1 :
219 1 : let _ex_lock_2 = tokio::time::timeout(
220 1 : tokio::time::Duration::from_millis(1),
221 1 : id_lock_map.exclusive(resource_id, Operations::Op2),
222 1 : )
223 1 : .await;
224 1 : assert!(_ex_lock_2.is_err());
225 1 : }
226 1 :
227 1 : let shared_lock_1 = id_lock_map.shared(resource_id, Operations::Op1).await;
228 1 : assert_eq!(shared_lock_1.operation, Operations::Op1);
229 1 : }
230 : }
|