Line data Source code
1 : use std::{
2 : sync::{
3 : atomic::{AtomicBool, Ordering},
4 : Arc,
5 : },
6 : time::Duration,
7 : };
8 :
9 : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
10 : ///
11 : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
12 : /// the resource calls `close()` when they want to ensure that all holders of guards
13 : /// have released them, and that no future guards will be issued.
14 : pub struct Gate {
15 : inner: Arc<GateInner>,
16 : }
17 :
18 : impl std::fmt::Debug for Gate {
19 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 0 : f.debug_struct("Gate")
21 0 : // use this for identification
22 0 : .field("ptr", &Arc::as_ptr(&self.inner))
23 0 : .field("inner", &self.inner)
24 0 : .finish()
25 0 : }
26 : }
27 :
28 : struct GateInner {
29 : sem: tokio::sync::Semaphore,
30 : closing: std::sync::atomic::AtomicBool,
31 : }
32 :
33 : impl std::fmt::Debug for GateInner {
34 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 0 : let avail = self.sem.available_permits();
36 0 :
37 0 : let guards = u32::try_from(avail)
38 0 : .ok()
39 0 : // the sem only supports 32-bit ish amount, but lets play it safe
40 0 : .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
41 0 :
42 0 : let closing = self.closing.load(Ordering::Relaxed);
43 :
44 0 : if let Some(guards) = guards {
45 0 : f.debug_struct("Gate")
46 0 : .field("remaining_guards", &guards)
47 0 : .field("closing", &closing)
48 0 : .finish()
49 : } else {
50 0 : f.debug_struct("Gate")
51 0 : .field("avail_permits", &avail)
52 0 : .field("closing", &closing)
53 0 : .finish()
54 : }
55 0 : }
56 : }
57 :
58 : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
59 : /// not complete.
60 : #[derive(Debug)]
61 : pub struct GateGuard {
62 : // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
63 : span_at_enter: tracing::Span,
64 : gate: Arc<GateInner>,
65 : }
66 :
67 : impl GateGuard {
68 3 : pub fn try_clone(&self) -> Result<Self, GateError> {
69 3 : Gate::enter_impl(self.gate.clone())
70 3 : }
71 : }
72 :
73 : impl Drop for GateGuard {
74 8597 : fn drop(&mut self) {
75 8597 : if self.gate.closing.load(Ordering::Relaxed) {
76 27 : self.span_at_enter.in_scope(
77 27 : || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
78 27 : );
79 8570 : }
80 :
81 : // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
82 : // manually, so "return" the permit now.
83 8597 : self.gate.sem.add_permits(1);
84 8597 : }
85 : }
86 :
87 : #[derive(Debug, thiserror::Error)]
88 : pub enum GateError {
89 : #[error("gate is closed")]
90 : GateClosed,
91 : }
92 :
93 : impl Default for Gate {
94 1487 : fn default() -> Self {
95 1487 : Self {
96 1487 : inner: Arc::new(GateInner {
97 1487 : sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
98 1487 : closing: AtomicBool::new(false),
99 1487 : }),
100 1487 : }
101 1487 : }
102 : }
103 :
104 : impl Gate {
105 : const MAX_UNITS: u32 = u32::MAX;
106 :
107 : /// Acquire a guard that will prevent close() calls from completing. If close()
108 : /// was already called, this will return an error which should be interpreted
109 : /// as "shutting down".
110 : ///
111 : /// This function would typically be used from e.g. request handlers. While holding
112 : /// the guard returned from this function, it is important to respect a CancellationToken
113 : /// to avoid blocking close() indefinitely: typically types that contain a Gate will
114 : /// also contain a CancellationToken.
115 9717 : pub fn enter(&self) -> Result<GateGuard, GateError> {
116 9717 : Self::enter_impl(self.inner.clone())
117 9717 : }
118 :
119 9720 : fn enter_impl(gate: Arc<GateInner>) -> Result<GateGuard, GateError> {
120 9720 : let permit = gate.sem.try_acquire().map_err(|_| GateError::GateClosed)?;
121 :
122 : // we now have the permit, let's disable the normal raii functionality and leave
123 : // "returning" the permit to our GateGuard::drop.
124 : //
125 : // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
126 : // fields).
127 9709 : permit.forget();
128 9709 :
129 9709 : Ok(GateGuard {
130 9709 : span_at_enter: tracing::Span::current(),
131 9709 : gate,
132 9709 : })
133 9720 : }
134 :
135 : /// Types with a shutdown() method and a gate should call this method at the
136 : /// end of shutdown, to ensure that all GateGuard holders are done.
137 : ///
138 : /// This will wait for all guards to be destroyed. For this to complete promptly, it is
139 : /// important that the holders of such guards are respecting a CancellationToken which has
140 : /// been cancelled before entering this function.
141 72 : pub async fn close(&self) {
142 72 : let started_at = std::time::Instant::now();
143 72 : let mut do_close = std::pin::pin!(self.do_close());
144 72 :
145 72 : // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
146 72 : let nag_after = Duration::from_millis(100);
147 :
148 72 : let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
149 50 : return;
150 : };
151 :
152 22 : tracing::info!(
153 0 : gate = ?self.as_ptr(),
154 0 : elapsed_ms = started_at.elapsed().as_millis(),
155 0 : "closing is taking longer than expected"
156 : );
157 :
158 : // close operation is not trying to be cancellation safe as pageserver does not need it.
159 : //
160 : // note: "closing" is not checked in Gate::enter -- it exists just for observability,
161 : // dropping of GateGuard after this will log who they were.
162 22 : self.inner.closing.store(true, Ordering::Relaxed);
163 22 :
164 22 : do_close.await;
165 :
166 6 : tracing::info!(
167 0 : gate = ?self.as_ptr(),
168 0 : elapsed_ms = started_at.elapsed().as_millis(),
169 0 : "close completed"
170 : );
171 56 : }
172 :
173 : /// Used as an identity of a gate. This identity will be resolved to something useful when
174 : /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
175 : /// more.
176 : ///
177 : /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
178 : /// open for too long.
179 24 : fn as_ptr(&self) -> *const GateInner {
180 24 : Arc::as_ptr(&self.inner)
181 24 : }
182 :
183 : /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
184 : /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
185 : /// the CancellationToken on such types is analogous to "Did shutdown start?"
186 4 : pub fn close_complete(&self) -> bool {
187 4 : self.inner.sem.is_closed()
188 4 : }
189 :
190 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
191 : async fn do_close(&self) {
192 : tracing::debug!("Closing Gate...");
193 :
194 : match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
195 : Ok(_permit) => {
196 : // While holding all units, close the semaphore. All subsequent calls to enter() will fail.
197 : self.inner.sem.close();
198 : }
199 : Err(_closed) => {
200 : // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
201 : // This is legal. Timeline::shutdown for example is not protected from being called more than
202 : // once.
203 : tracing::debug!("Double close")
204 : }
205 : }
206 : tracing::debug!("Closed Gate.")
207 : }
208 : }
209 :
210 : #[cfg(test)]
211 : mod tests {
212 : use super::*;
213 :
214 : #[tokio::test]
215 1 : async fn close_unused() {
216 1 : // Having taken no guards, we should not be blocked in close
217 1 : let gate = Gate::default();
218 1 : gate.close().await;
219 1 : }
220 :
221 : #[tokio::test]
222 1 : async fn close_idle() {
223 1 : // If a guard is dropped before entering, close should not be blocked
224 1 : let gate = Gate::default();
225 1 : let guard = gate.enter().unwrap();
226 1 : drop(guard);
227 1 : gate.close().await;
228 1 :
229 1 : // Entering a closed guard fails
230 1 : gate.enter().expect_err("enter should fail after close");
231 1 : }
232 :
233 : #[tokio::test(start_paused = true)]
234 1 : async fn close_busy_gate() {
235 1 : let gate = Gate::default();
236 1 : let forever = Duration::from_secs(24 * 7 * 365);
237 1 :
238 1 : let guard =
239 1 : tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
240 1 :
241 1 : let mut close_fut = std::pin::pin!(gate.close());
242 1 :
243 1 : // Close should be waiting for guards to drop
244 1 : tokio::time::timeout(forever, &mut close_fut)
245 1 : .await
246 1 : .unwrap_err();
247 1 :
248 1 : // Attempting to enter() should fail, even though close isn't done yet.
249 1 : gate.enter()
250 1 : .expect_err("enter should fail after entering close");
251 1 :
252 1 : // this will now log, which we cannot verify except manually
253 1 : drop(guard);
254 1 :
255 1 : // Guard is gone, close should finish
256 1 : close_fut.await;
257 1 :
258 1 : // Attempting to enter() is still forbidden
259 1 : gate.enter().expect_err("enter should fail finishing close");
260 1 : }
261 :
262 : #[tokio::test(start_paused = true)]
263 1 : async fn clone_gate_guard() {
264 1 : let gate = Gate::default();
265 1 : let forever = Duration::from_secs(24 * 7 * 365);
266 1 :
267 1 : let guard1 = gate.enter().expect("gate isn't closed");
268 1 :
269 1 : let guard2 = guard1.try_clone().expect("gate isn't clsoed");
270 1 :
271 1 : let mut close_fut = std::pin::pin!(gate.close());
272 1 :
273 1 : tokio::time::timeout(forever, &mut close_fut)
274 1 : .await
275 1 : .unwrap_err();
276 1 :
277 1 : // we polled close_fut once, that should prevent all later enters and clones
278 1 : gate.enter().unwrap_err();
279 1 : guard1.try_clone().unwrap_err();
280 1 : guard2.try_clone().unwrap_err();
281 1 :
282 1 : // guard2 keeps gate open even if guard1 is closed
283 1 : drop(guard1);
284 1 : tokio::time::timeout(forever, &mut close_fut)
285 1 : .await
286 1 : .unwrap_err();
287 1 :
288 1 : drop(guard2);
289 1 :
290 1 : // now that the last guard is dropped, closing should complete
291 1 : close_fut.await;
292 1 :
293 1 : // entering is still forbidden
294 1 : gate.enter().expect_err("enter should stilll fail");
295 1 : }
296 : }
|