Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicBool, Ordering},
4 : Arc,
5 : },
6 : time::Duration,
7 : };
8 :
9 : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
10 : ///
11 : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
12 : /// the resource calls `close()` when they want to ensure that all holders of guards
13 : /// have released them, and that no future guards will be issued.
14 : pub struct Gate {
15 : inner: Arc<GateInner>,
16 : }
17 :
18 : impl std::fmt::Debug for Gate {
19 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 0 : f.debug_struct("Gate")
21 0 : // use this for identification
22 0 : .field("ptr", &Arc::as_ptr(&self.inner))
23 0 : .field("inner", &self.inner)
24 0 : .finish()
25 0 : }
26 : }
27 :
28 : struct GateInner {
29 : sem: tokio::sync::Semaphore,
30 : closing: std::sync::atomic::AtomicBool,
31 : }
32 :
33 : impl std::fmt::Debug for GateInner {
34 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 0 : let avail = self.sem.available_permits();
36 0 :
37 0 : let guards = u32::try_from(avail)
38 0 : .ok()
39 0 : // the sem only supports 32-bit ish amount, but lets play it safe
40 0 : .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
41 0 :
42 0 : let closing = self.closing.load(Ordering::Relaxed);
43 :
44 0 : if let Some(guards) = guards {
45 0 : f.debug_struct("Gate")
46 0 : .field("remaining_guards", &guards)
47 0 : .field("closing", &closing)
48 0 : .finish()
49 : } else {
50 0 : f.debug_struct("Gate")
51 0 : .field("avail_permits", &avail)
52 0 : .field("closing", &closing)
53 0 : .finish()
54 : }
55 0 : }
56 : }
57 :
58 : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
59 : /// not complete.
60 : #[derive(Debug)]
61 : pub struct GateGuard {
62 : // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
63 : span_at_enter: tracing::Span,
64 : gate: Arc<GateInner>,
65 : }
66 :
67 : impl Drop for GateGuard {
68 5618 : fn drop(&mut self) {
69 5618 : if self.gate.closing.load(Ordering::Relaxed) {
70 25 : self.span_at_enter.in_scope(
71 25 : || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
72 25 : );
73 5593 : }
74 :
75 : // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
76 : // manually, so "return" the permit now.
77 5618 : self.gate.sem.add_permits(1);
78 5618 : }
79 : }
80 :
81 0 : #[derive(Debug, thiserror::Error)]
82 : pub enum GateError {
83 : #[error("gate is closed")]
84 : GateClosed,
85 : }
86 :
87 : impl Default for Gate {
88 1917 : fn default() -> Self {
89 1917 : Self {
90 1917 : inner: Arc::new(GateInner {
91 1917 : sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
92 1917 : closing: AtomicBool::new(false),
93 1917 : }),
94 1917 : }
95 1917 : }
96 : }
97 :
98 : impl Gate {
99 : const MAX_UNITS: u32 = u32::MAX;
100 :
101 : /// Acquire a guard that will prevent close() calls from completing. If close()
102 : /// was already called, this will return an error which should be interpreted
103 : /// as "shutting down".
104 : ///
105 : /// This function would typically be used from e.g. request handlers. While holding
106 : /// the guard returned from this function, it is important to respect a CancellationToken
107 : /// to avoid blocking close() indefinitely: typically types that contain a Gate will
108 : /// also contain a CancellationToken.
109 7217 : pub fn enter(&self) -> Result<GateGuard, GateError> {
110 7217 : let permit = self
111 7217 : .inner
112 7217 : .sem
113 7217 : .try_acquire()
114 7217 : .map_err(|_| GateError::GateClosed)?;
115 :
116 : // we now have the permit, let's disable the normal raii functionality and leave
117 : // "returning" the permit to our GateGuard::drop.
118 : //
119 : // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
120 : // fields).
121 7208 : permit.forget();
122 7208 :
123 7208 : Ok(GateGuard {
124 7208 : span_at_enter: tracing::Span::current(),
125 7208 : gate: self.inner.clone(),
126 7208 : })
127 7217 : }
128 :
129 : /// Types with a shutdown() method and a gate should call this method at the
130 : /// end of shutdown, to ensure that all GateGuard holders are done.
131 : ///
132 : /// This will wait for all guards to be destroyed. For this to complete promptly, it is
133 : /// important that the holders of such guards are respecting a CancellationToken which has
134 : /// been cancelled before entering this function.
135 87 : pub async fn close(&self) {
136 87 : let started_at = std::time::Instant::now();
137 87 : let mut do_close = std::pin::pin!(self.do_close());
138 87 :
139 87 : // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
140 87 : let nag_after = Duration::from_millis(100);
141 :
142 87 : let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
143 62 : return;
144 : };
145 :
146 25 : tracing::info!(
147 0 : gate = ?self.as_ptr(),
148 0 : elapsed_ms = started_at.elapsed().as_millis(),
149 0 : "closing is taking longer than expected"
150 : );
151 :
152 : // close operation is not trying to be cancellation safe as pageserver does not need it.
153 : //
154 : // note: "closing" is not checked in Gate::enter -- it exists just for observability,
155 : // dropping of GateGuard after this will log who they were.
156 25 : self.inner.closing.store(true, Ordering::Relaxed);
157 25 :
158 25 : do_close.await;
159 :
160 7 : tracing::info!(
161 0 : gate = ?self.as_ptr(),
162 0 : elapsed_ms = started_at.elapsed().as_millis(),
163 0 : "close completed"
164 : );
165 69 : }
166 :
167 : /// Used as an identity of a gate. This identity will be resolved to something useful when
168 : /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
169 : /// more.
170 : ///
171 : /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
172 : /// open for too long.
173 27 : fn as_ptr(&self) -> *const GateInner {
174 27 : Arc::as_ptr(&self.inner)
175 27 : }
176 :
177 : /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
178 : /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
179 : /// the CancellationToken on such types is analogous to "Did shutdown start?"
180 6 : pub fn close_complete(&self) -> bool {
181 6 : self.inner.sem.is_closed()
182 6 : }
183 :
184 138 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
185 : async fn do_close(&self) {
186 : tracing::debug!("Closing Gate...");
187 :
188 : match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
189 : Ok(_permit) => {
190 : // While holding all units, close the semaphore. All subsequent calls to enter() will fail.
191 : self.inner.sem.close();
192 : }
193 : Err(_closed) => {
194 : // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
195 : // This is legal. Timeline::shutdown for example is not protected from being called more than
196 : // once.
197 : tracing::debug!("Double close")
198 : }
199 : }
200 : tracing::debug!("Closed Gate.")
201 : }
202 : }
203 :
204 : #[cfg(test)]
205 : mod tests {
206 : use super::*;
207 :
208 : #[tokio::test]
209 1 : async fn close_unused() {
210 1 : // Having taken no guards, we should not be blocked in close
211 1 : let gate = Gate::default();
212 1 : gate.close().await;
213 1 : }
214 :
215 : #[tokio::test]
216 1 : async fn close_idle() {
217 1 : // If a guard is dropped before entering, close should not be blocked
218 1 : let gate = Gate::default();
219 1 : let guard = gate.enter().unwrap();
220 1 : drop(guard);
221 1 : gate.close().await;
222 1 :
223 1 : // Entering a closed guard fails
224 1 : gate.enter().expect_err("enter should fail after close");
225 1 : }
226 :
227 : #[tokio::test(start_paused = true)]
228 1 : async fn close_busy_gate() {
229 1 : let gate = Gate::default();
230 1 : let forever = Duration::from_secs(24 * 7 * 365);
231 1 :
232 1 : let guard =
233 1 : tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
234 1 :
235 1 : let mut close_fut = std::pin::pin!(gate.close());
236 1 :
237 1 : // Close should be waiting for guards to drop
238 1 : tokio::time::timeout(forever, &mut close_fut)
239 2 : .await
240 1 : .unwrap_err();
241 1 :
242 1 : // Attempting to enter() should fail, even though close isn't done yet.
243 1 : gate.enter()
244 1 : .expect_err("enter should fail after entering close");
245 1 :
246 1 : // this will now log, which we cannot verify except manually
247 1 : drop(guard);
248 1 :
249 1 : // Guard is gone, close should finish
250 1 : close_fut.await;
251 1 :
252 1 : // Attempting to enter() is still forbidden
253 1 : gate.enter().expect_err("enter should fail finishing close");
254 1 : }
255 : }
|