Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicBool, Ordering},
4 : Arc,
5 : },
6 : time::Duration,
7 : };
8 :
9 : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
10 : ///
11 : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
12 : /// the resource calls `close()` when they want to ensure that all holders of guards
13 : /// have released them, and that no future guards will be issued.
14 : pub struct Gate {
15 : inner: Arc<GateInner>,
16 : }
17 :
18 : impl std::fmt::Debug for Gate {
19 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 0 : f.debug_struct("Gate")
21 0 : // use this for identification
22 0 : .field("ptr", &Arc::as_ptr(&self.inner))
23 0 : .field("inner", &self.inner)
24 0 : .finish()
25 0 : }
26 : }
27 :
28 : struct GateInner {
29 : sem: tokio::sync::Semaphore,
30 : closing: std::sync::atomic::AtomicBool,
31 : }
32 :
33 : impl std::fmt::Debug for GateInner {
34 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 0 : let avail = self.sem.available_permits();
36 0 :
37 0 : let guards = u32::try_from(avail)
38 0 : .ok()
39 0 : // the sem only supports 32-bit ish amount, but lets play it safe
40 0 : .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
41 0 :
42 0 : let closing = self.closing.load(Ordering::Relaxed);
43 :
44 0 : if let Some(guards) = guards {
45 0 : f.debug_struct("Gate")
46 0 : .field("remaining_guards", &guards)
47 0 : .field("closing", &closing)
48 0 : .finish()
49 : } else {
50 0 : f.debug_struct("Gate")
51 0 : .field("avail_permits", &avail)
52 0 : .field("closing", &closing)
53 0 : .finish()
54 : }
55 0 : }
56 : }
57 :
58 : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
59 : /// not complete.
60 0 : #[derive(Debug)]
61 : pub struct GateGuard {
62 : // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
63 : span_at_enter: tracing::Span,
64 : gate: Arc<GateInner>,
65 : }
66 :
67 : impl Drop for GateGuard {
68 15895 : fn drop(&mut self) {
69 15895 : if self.gate.closing.load(Ordering::Relaxed) {
70 2 : self.span_at_enter.in_scope(
71 2 : || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
72 2 : );
73 15893 : }
74 :
75 : // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
76 : // manually, so "return" the permit now.
77 15895 : self.gate.sem.add_permits(1);
78 15895 : }
79 : }
80 :
81 0 : #[derive(Debug)]
82 : pub enum GateError {
83 : GateClosed,
84 : }
85 :
86 : impl Default for Gate {
87 2548 : fn default() -> Self {
88 2548 : Self {
89 2548 : inner: Arc::new(GateInner {
90 2548 : sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
91 2548 : closing: AtomicBool::new(false),
92 2548 : }),
93 2548 : }
94 2548 : }
95 : }
96 :
97 : impl Gate {
98 : const MAX_UNITS: u32 = u32::MAX;
99 :
100 : /// Acquire a guard that will prevent close() calls from completing. If close()
101 : /// was already called, this will return an error which should be interpreted
102 : /// as "shutting down".
103 : ///
104 : /// This function would typically be used from e.g. request handlers. While holding
105 : /// the guard returned from this function, it is important to respect a CancellationToken
106 : /// to avoid blocking close() indefinitely: typically types that contain a Gate will
107 : /// also contain a CancellationToken.
108 16906 : pub fn enter(&self) -> Result<GateGuard, GateError> {
109 16906 : let permit = self
110 16906 : .inner
111 16906 : .sem
112 16906 : .try_acquire()
113 16906 : .map_err(|_| GateError::GateClosed)?;
114 :
115 : // we now have the permit, let's disable the normal raii functionality and leave
116 : // "returning" the permit to our GateGuard::drop.
117 : //
118 : // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
119 : // fields).
120 16900 : permit.forget();
121 16900 :
122 16900 : Ok(GateGuard {
123 16900 : span_at_enter: tracing::Span::current(),
124 16900 : gate: self.inner.clone(),
125 16900 : })
126 16906 : }
127 :
128 : /// Types with a shutdown() method and a gate should call this method at the
129 : /// end of shutdown, to ensure that all GateGuard holders are done.
130 : ///
131 : /// This will wait for all guards to be destroyed. For this to complete promptly, it is
132 : /// important that the holders of such guards are respecting a CancellationToken which has
133 : /// been cancelled before entering this function.
134 1245 : pub async fn close(&self) {
135 1245 : let started_at = std::time::Instant::now();
136 1245 : let mut do_close = std::pin::pin!(self.do_close());
137 1245 :
138 1245 : let nag_after = Duration::from_secs(1);
139 :
140 1245 : let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
141 1243 : return;
142 : };
143 :
144 0 : tracing::info!(
145 0 : gate = ?self.as_ptr(),
146 0 : elapsed_ms = started_at.elapsed().as_millis(),
147 0 : "closing is taking longer than expected"
148 0 : );
149 :
150 : // close operation is not trying to be cancellation safe as pageserver does not need it.
151 : //
152 : // note: "closing" is not checked in Gate::enter -- it exists just for observability,
153 : // dropping of GateGuard after this will log who they were.
154 2 : self.inner.closing.store(true, Ordering::Relaxed);
155 2 :
156 4 : do_close.await;
157 :
158 0 : tracing::info!(
159 0 : gate = ?self.as_ptr(),
160 0 : elapsed_ms = started_at.elapsed().as_millis(),
161 0 : "close completed"
162 0 : );
163 1245 : }
164 :
165 : /// Used as an identity of a gate. This identity will be resolved to something useful when
166 : /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
167 : /// more.
168 : ///
169 : /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
170 : /// open for too long.
171 6 : fn as_ptr(&self) -> *const GateInner {
172 6 : Arc::as_ptr(&self.inner)
173 6 : }
174 :
175 : /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
176 : /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
177 : /// the CancellationToken on such types is analogous to "Did shutdown start?"
178 177 : pub fn close_complete(&self) -> bool {
179 177 : self.inner.sem.is_closed()
180 177 : }
181 :
182 1245 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
183 : async fn do_close(&self) {
184 0 : tracing::debug!("Closing Gate...");
185 :
186 : match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
187 : Ok(_permit) => {
188 : // While holding all units, close the semaphore. All subsequent calls to enter() will fail.
189 : self.inner.sem.close();
190 : }
191 : Err(_closed) => {
192 : // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
193 : // This is legal. Timeline::shutdown for example is not protected from being called more than
194 : // once.
195 0 : tracing::debug!("Double close")
196 : }
197 : }
198 0 : tracing::debug!("Closed Gate.")
199 : }
200 : }
201 :
202 : #[cfg(test)]
203 : mod tests {
204 : use super::*;
205 :
206 2 : #[tokio::test]
207 2 : async fn close_unused() {
208 2 : // Having taken no guards, we should not be blocked in close
209 2 : let gate = Gate::default();
210 2 : gate.close().await;
211 : }
212 :
213 2 : #[tokio::test]
214 2 : async fn close_idle() {
215 2 : // If a guard is dropped before entering, close should not be blocked
216 2 : let gate = Gate::default();
217 2 : let guard = gate.enter().unwrap();
218 2 : drop(guard);
219 2 : gate.close().await;
220 :
221 : // Entering a closed guard fails
222 2 : gate.enter().expect_err("enter should fail after close");
223 : }
224 :
225 2 : #[tokio::test(start_paused = true)]
226 2 : async fn close_busy_gate() {
227 2 : let gate = Gate::default();
228 2 : let forever = Duration::from_secs(24 * 7 * 365);
229 :
230 2 : let guard =
231 2 : tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
232 2 :
233 2 : let mut close_fut = std::pin::pin!(gate.close());
234 2 :
235 2 : // Close should be waiting for guards to drop
236 2 : tokio::time::timeout(forever, &mut close_fut)
237 4 : .await
238 2 : .unwrap_err();
239 2 :
240 2 : // Attempting to enter() should fail, even though close isn't done yet.
241 2 : gate.enter()
242 2 : .expect_err("enter should fail after entering close");
243 2 :
244 2 : // this will now log, which we cannot verify except manually
245 2 : drop(guard);
246 2 :
247 2 : // Guard is gone, close should finish
248 2 : close_fut.await;
249 :
250 : // Attempting to enter() is still forbidden
251 2 : gate.enter().expect_err("enter should fail finishing close");
252 : }
253 : }
|