Line data Source code
1 : use std::sync::Arc;
2 : use std::sync::atomic::{AtomicBool, Ordering};
3 : use std::time::Duration;
4 :
5 : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
6 : ///
7 : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
8 : /// the resource calls `close()` when they want to ensure that all holders of guards
9 : /// have released them, and that no future guards will be issued.
10 : pub struct Gate {
11 : inner: Arc<GateInner>,
12 : }
13 :
14 : impl std::fmt::Debug for Gate {
15 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 0 : f.debug_struct("Gate")
17 0 : // use this for identification
18 0 : .field("ptr", &Arc::as_ptr(&self.inner))
19 0 : .field("inner", &self.inner)
20 0 : .finish()
21 0 : }
22 : }
23 :
24 : struct GateInner {
25 : sem: tokio::sync::Semaphore,
26 : closing: std::sync::atomic::AtomicBool,
27 : }
28 :
29 : impl std::fmt::Debug for GateInner {
30 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 0 : let avail = self.sem.available_permits();
32 0 :
33 0 : let guards = u32::try_from(avail)
34 0 : .ok()
35 0 : // the sem only supports 32-bit ish amount, but lets play it safe
36 0 : .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
37 0 :
38 0 : let closing = self.closing.load(Ordering::Relaxed);
39 :
40 0 : if let Some(guards) = guards {
41 0 : f.debug_struct("Gate")
42 0 : .field("remaining_guards", &guards)
43 0 : .field("closing", &closing)
44 0 : .finish()
45 : } else {
46 0 : f.debug_struct("Gate")
47 0 : .field("avail_permits", &avail)
48 0 : .field("closing", &closing)
49 0 : .finish()
50 : }
51 0 : }
52 : }
53 :
54 : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
55 : /// not complete.
56 : #[derive(Debug)]
57 : pub struct GateGuard {
58 : // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
59 : span_at_enter: tracing::Span,
60 : gate: Arc<GateInner>,
61 : }
62 :
63 : impl GateGuard {
64 3 : pub fn try_clone(&self) -> Result<Self, GateError> {
65 3 : Gate::enter_impl(self.gate.clone())
66 3 : }
67 : }
68 :
69 : impl Drop for GateGuard {
70 8690 : fn drop(&mut self) {
71 8690 : if self.gate.closing.load(Ordering::Relaxed) {
72 27 : self.span_at_enter.in_scope(
73 27 : || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
74 27 : );
75 8663 : }
76 :
77 : // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
78 : // manually, so "return" the permit now.
79 8690 : self.gate.sem.add_permits(1);
80 8690 : }
81 : }
82 :
83 : #[derive(Debug, thiserror::Error)]
84 : pub enum GateError {
85 : #[error("gate is closed")]
86 : GateClosed,
87 : }
88 :
89 : impl Default for Gate {
90 1509 : fn default() -> Self {
91 1509 : Self {
92 1509 : inner: Arc::new(GateInner {
93 1509 : sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
94 1509 : closing: AtomicBool::new(false),
95 1509 : }),
96 1509 : }
97 1509 : }
98 : }
99 :
100 : impl Gate {
101 : const MAX_UNITS: u32 = u32::MAX;
102 :
103 : /// Acquire a guard that will prevent close() calls from completing. If close()
104 : /// was already called, this will return an error which should be interpreted
105 : /// as "shutting down".
106 : ///
107 : /// This function would typically be used from e.g. request handlers. While holding
108 : /// the guard returned from this function, it is important to respect a CancellationToken
109 : /// to avoid blocking close() indefinitely: typically types that contain a Gate will
110 : /// also contain a CancellationToken.
111 9826 : pub fn enter(&self) -> Result<GateGuard, GateError> {
112 9826 : Self::enter_impl(self.inner.clone())
113 9826 : }
114 :
115 9829 : fn enter_impl(gate: Arc<GateInner>) -> Result<GateGuard, GateError> {
116 9829 : let permit = gate.sem.try_acquire().map_err(|_| GateError::GateClosed)?;
117 :
118 : // we now have the permit, let's disable the normal raii functionality and leave
119 : // "returning" the permit to our GateGuard::drop.
120 : //
121 : // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
122 : // fields).
123 9818 : permit.forget();
124 9818 :
125 9818 : Ok(GateGuard {
126 9818 : span_at_enter: tracing::Span::current(),
127 9818 : gate,
128 9818 : })
129 9829 : }
130 :
131 : /// Types with a shutdown() method and a gate should call this method at the
132 : /// end of shutdown, to ensure that all GateGuard holders are done.
133 : ///
134 : /// This will wait for all guards to be destroyed. For this to complete promptly, it is
135 : /// important that the holders of such guards are respecting a CancellationToken which has
136 : /// been cancelled before entering this function.
137 72 : pub async fn close(&self) {
138 72 : let started_at = std::time::Instant::now();
139 72 : let mut do_close = std::pin::pin!(self.do_close());
140 72 :
141 72 : // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
142 72 : let nag_after = Duration::from_millis(100);
143 :
144 72 : let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
145 50 : return;
146 : };
147 :
148 22 : tracing::info!(
149 0 : gate = ?self.as_ptr(),
150 0 : elapsed_ms = started_at.elapsed().as_millis(),
151 0 : "closing is taking longer than expected"
152 : );
153 :
154 : // close operation is not trying to be cancellation safe as pageserver does not need it.
155 : //
156 : // note: "closing" is not checked in Gate::enter -- it exists just for observability,
157 : // dropping of GateGuard after this will log who they were.
158 22 : self.inner.closing.store(true, Ordering::Relaxed);
159 22 :
160 22 : do_close.await;
161 :
162 6 : tracing::info!(
163 0 : gate = ?self.as_ptr(),
164 0 : elapsed_ms = started_at.elapsed().as_millis(),
165 0 : "close completed"
166 : );
167 4 : }
168 :
169 : /// Used as an identity of a gate. This identity will be resolved to something useful when
170 : /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
171 : /// more.
172 : ///
173 : /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
174 : /// open for too long.
175 24 : fn as_ptr(&self) -> *const GateInner {
176 24 : Arc::as_ptr(&self.inner)
177 24 : }
178 :
179 : /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
180 : /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
181 : /// the CancellationToken on such types is analogous to "Did shutdown start?"
182 4 : pub fn close_complete(&self) -> bool {
183 4 : self.inner.sem.is_closed()
184 4 : }
185 :
186 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
187 : async fn do_close(&self) {
188 : tracing::debug!("Closing Gate...");
189 :
190 : match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
191 : Ok(_permit) => {
192 : // While holding all units, close the semaphore. All subsequent calls to enter() will fail.
193 : self.inner.sem.close();
194 : }
195 : Err(_closed) => {
196 : // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
197 : // This is legal. Timeline::shutdown for example is not protected from being called more than
198 : // once.
199 : tracing::debug!("Double close")
200 : }
201 : }
202 : tracing::debug!("Closed Gate.")
203 : }
204 : }
205 :
206 : #[cfg(test)]
207 : mod tests {
208 : use super::*;
209 :
210 : #[tokio::test]
211 1 : async fn close_unused() {
212 1 : // Having taken no guards, we should not be blocked in close
213 1 : let gate = Gate::default();
214 1 : gate.close().await;
215 1 : }
216 :
217 : #[tokio::test]
218 1 : async fn close_idle() {
219 1 : // If a guard is dropped before entering, close should not be blocked
220 1 : let gate = Gate::default();
221 1 : let guard = gate.enter().unwrap();
222 1 : drop(guard);
223 1 : gate.close().await;
224 1 :
225 1 : // Entering a closed guard fails
226 1 : gate.enter().expect_err("enter should fail after close");
227 1 : }
228 :
229 : #[tokio::test(start_paused = true)]
230 1 : async fn close_busy_gate() {
231 1 : let gate = Gate::default();
232 1 : let forever = Duration::from_secs(24 * 7 * 365);
233 1 :
234 1 : let guard =
235 1 : tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
236 1 :
237 1 : let mut close_fut = std::pin::pin!(gate.close());
238 1 :
239 1 : // Close should be waiting for guards to drop
240 1 : tokio::time::timeout(forever, &mut close_fut)
241 1 : .await
242 1 : .unwrap_err();
243 1 :
244 1 : // Attempting to enter() should fail, even though close isn't done yet.
245 1 : gate.enter()
246 1 : .expect_err("enter should fail after entering close");
247 1 :
248 1 : // this will now log, which we cannot verify except manually
249 1 : drop(guard);
250 1 :
251 1 : // Guard is gone, close should finish
252 1 : close_fut.await;
253 1 :
254 1 : // Attempting to enter() is still forbidden
255 1 : gate.enter().expect_err("enter should fail finishing close");
256 1 : }
257 :
258 : #[tokio::test(start_paused = true)]
259 1 : async fn clone_gate_guard() {
260 1 : let gate = Gate::default();
261 1 : let forever = Duration::from_secs(24 * 7 * 365);
262 1 :
263 1 : let guard1 = gate.enter().expect("gate isn't closed");
264 1 :
265 1 : let guard2 = guard1.try_clone().expect("gate isn't clsoed");
266 1 :
267 1 : let mut close_fut = std::pin::pin!(gate.close());
268 1 :
269 1 : tokio::time::timeout(forever, &mut close_fut)
270 1 : .await
271 1 : .unwrap_err();
272 1 :
273 1 : // we polled close_fut once, that should prevent all later enters and clones
274 1 : gate.enter().unwrap_err();
275 1 : guard1.try_clone().unwrap_err();
276 1 : guard2.try_clone().unwrap_err();
277 1 :
278 1 : // guard2 keeps gate open even if guard1 is closed
279 1 : drop(guard1);
280 1 : tokio::time::timeout(forever, &mut close_fut)
281 1 : .await
282 1 : .unwrap_err();
283 1 :
284 1 : drop(guard2);
285 1 :
286 1 : // now that the last guard is dropped, closing should complete
287 1 : close_fut.await;
288 1 :
289 1 : // entering is still forbidden
290 1 : gate.enter().expect_err("enter should stilll fail");
291 1 : }
292 : }
|