Line data Source code
1 : use std::sync::Arc;
2 : use std::sync::atomic::{AtomicBool, Ordering};
3 : use std::time::Duration;
4 :
5 : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
6 : ///
7 : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
8 : /// the resource calls `close()` when they want to ensure that all holders of guards
9 : /// have released them, and that no future guards will be issued.
10 : pub struct Gate {
11 : inner: Arc<GateInner>,
12 : }
13 :
14 : impl std::fmt::Debug for Gate {
15 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 0 : f.debug_struct("Gate")
17 0 : // use this for identification
18 0 : .field("ptr", &Arc::as_ptr(&self.inner))
19 0 : .field("inner", &self.inner)
20 0 : .finish()
21 0 : }
22 : }
23 :
24 : struct GateInner {
25 : sem: tokio::sync::Semaphore,
26 : closing: std::sync::atomic::AtomicBool,
27 : }
28 :
29 : impl std::fmt::Debug for GateInner {
30 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 0 : let avail = self.sem.available_permits();
32 :
33 0 : let guards = u32::try_from(avail)
34 0 : .ok()
35 : // the sem only supports 32-bit ish amount, but lets play it safe
36 0 : .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
37 :
38 0 : let closing = self.closing.load(Ordering::Relaxed);
39 :
40 0 : if let Some(guards) = guards {
41 0 : f.debug_struct("Gate")
42 0 : .field("remaining_guards", &guards)
43 0 : .field("closing", &closing)
44 0 : .finish()
45 : } else {
46 0 : f.debug_struct("Gate")
47 0 : .field("avail_permits", &avail)
48 0 : .field("closing", &closing)
49 0 : .finish()
50 : }
51 0 : }
52 : }
53 :
54 : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
55 : /// not complete.
56 : #[derive(Debug)]
57 : pub struct GateGuard {
58 : // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
59 : span_at_enter: tracing::Span,
60 : gate: Arc<GateInner>,
61 : }
62 :
63 : impl GateGuard {
64 3 : pub fn try_clone(&self) -> Result<Self, GateError> {
65 3 : Gate::enter_impl(self.gate.clone())
66 3 : }
67 : }
68 :
69 : impl Drop for GateGuard {
70 4588 : fn drop(&mut self) {
71 4588 : if self.gate.closing.load(Ordering::Relaxed) {
72 18 : self.span_at_enter.in_scope(
73 0 : || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
74 : );
75 4570 : }
76 :
77 : // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
78 : // manually, so "return" the permit now.
79 4588 : self.gate.sem.add_permits(1);
80 4588 : }
81 : }
82 :
83 : #[derive(Debug, thiserror::Error)]
84 : pub enum GateError {
85 : #[error("gate is closed")]
86 : GateClosed,
87 : }
88 :
89 : impl GateError {
90 0 : pub fn is_cancel(&self) -> bool {
91 0 : match self {
92 0 : GateError::GateClosed => true,
93 : }
94 0 : }
95 : }
96 :
97 : impl Default for Gate {
98 415 : fn default() -> Self {
99 415 : Self {
100 415 : inner: Arc::new(GateInner {
101 415 : sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
102 415 : closing: AtomicBool::new(false),
103 415 : }),
104 415 : }
105 415 : }
106 : }
107 :
108 : impl Gate {
109 : const MAX_UNITS: u32 = u32::MAX;
110 :
111 : /// Acquire a guard that will prevent close() calls from completing. If close()
112 : /// was already called, this will return an error which should be interpreted
113 : /// as "shutting down".
114 : ///
115 : /// This function would typically be used from e.g. request handlers. While holding
116 : /// the guard returned from this function, it is important to respect a CancellationToken
117 : /// to avoid blocking close() indefinitely: typically types that contain a Gate will
118 : /// also contain a CancellationToken.
119 4897 : pub fn enter(&self) -> Result<GateGuard, GateError> {
120 4897 : Self::enter_impl(self.inner.clone())
121 4897 : }
122 :
123 4900 : fn enter_impl(gate: Arc<GateInner>) -> Result<GateGuard, GateError> {
124 4900 : let permit = gate.sem.try_acquire().map_err(|_| GateError::GateClosed)?;
125 :
126 : // we now have the permit, let's disable the normal raii functionality and leave
127 : // "returning" the permit to our GateGuard::drop.
128 : //
129 : // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
130 : // fields).
131 4883 : permit.forget();
132 :
133 4883 : Ok(GateGuard {
134 4883 : span_at_enter: tracing::Span::current(),
135 4883 : gate,
136 4883 : })
137 4900 : }
138 :
139 : /// Types with a shutdown() method and a gate should call this method at the
140 : /// end of shutdown, to ensure that all GateGuard holders are done.
141 : ///
142 : /// This will wait for all guards to be destroyed. For this to complete promptly, it is
143 : /// important that the holders of such guards are respecting a CancellationToken which has
144 : /// been cancelled before entering this function.
145 21 : pub async fn close(&self) {
146 21 : let started_at = std::time::Instant::now();
147 21 : let mut do_close = std::pin::pin!(self.do_close());
148 :
149 : // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
150 21 : let nag_after = Duration::from_millis(100);
151 :
152 21 : let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
153 14 : return;
154 : };
155 :
156 7 : tracing::info!(
157 0 : gate = ?self.as_ptr(),
158 0 : elapsed_ms = started_at.elapsed().as_millis(),
159 0 : "closing is taking longer than expected"
160 : );
161 :
162 : // close operation is not trying to be cancellation safe as pageserver does not need it.
163 : //
164 : // note: "closing" is not checked in Gate::enter -- it exists just for observability,
165 : // dropping of GateGuard after this will log who they were.
166 7 : self.inner.closing.store(true, Ordering::Relaxed);
167 :
168 7 : do_close.await;
169 :
170 3 : tracing::info!(
171 0 : gate = ?self.as_ptr(),
172 0 : elapsed_ms = started_at.elapsed().as_millis(),
173 0 : "close completed"
174 : );
175 17 : }
176 :
177 : /// Used as an identity of a gate. This identity will be resolved to something useful when
178 : /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
179 : /// more.
180 : ///
181 : /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
182 : /// open for too long.
183 9 : fn as_ptr(&self) -> *const GateInner {
184 9 : Arc::as_ptr(&self.inner)
185 9 : }
186 :
187 : /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
188 : /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
189 : /// the CancellationToken on such types is analogous to "Did shutdown start?"
190 1 : pub fn close_complete(&self) -> bool {
191 1 : self.inner.sem.is_closed()
192 1 : }
193 :
194 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
195 : async fn do_close(&self) {
196 : tracing::debug!("Closing Gate...");
197 :
198 : match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
199 : Ok(_permit) => {
200 : // While holding all units, close the semaphore. All subsequent calls to enter() will fail.
201 : self.inner.sem.close();
202 : }
203 : Err(_closed) => {
204 : // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
205 : // This is legal. Timeline::shutdown for example is not protected from being called more than
206 : // once.
207 : tracing::debug!("Double close")
208 : }
209 : }
210 : tracing::debug!("Closed Gate.")
211 : }
212 : }
213 :
214 : #[cfg(test)]
215 : mod tests {
216 : use super::*;
217 :
218 : #[tokio::test]
219 1 : async fn close_unused() {
220 : // Having taken no guards, we should not be blocked in close
221 1 : let gate = Gate::default();
222 1 : gate.close().await;
223 1 : }
224 :
225 : #[tokio::test]
226 1 : async fn close_idle() {
227 : // If a guard is dropped before entering, close should not be blocked
228 1 : let gate = Gate::default();
229 1 : let guard = gate.enter().unwrap();
230 1 : drop(guard);
231 1 : gate.close().await;
232 :
233 : // Entering a closed guard fails
234 1 : gate.enter().expect_err("enter should fail after close");
235 1 : }
236 :
237 : #[tokio::test(start_paused = true)]
238 1 : async fn close_busy_gate() {
239 1 : let gate = Gate::default();
240 1 : let forever = Duration::from_secs(24 * 7 * 365);
241 :
242 1 : let guard =
243 1 : tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
244 :
245 1 : let mut close_fut = std::pin::pin!(gate.close());
246 :
247 : // Close should be waiting for guards to drop
248 1 : tokio::time::timeout(forever, &mut close_fut)
249 1 : .await
250 1 : .unwrap_err();
251 :
252 : // Attempting to enter() should fail, even though close isn't done yet.
253 1 : gate.enter()
254 1 : .expect_err("enter should fail after entering close");
255 :
256 : // this will now log, which we cannot verify except manually
257 1 : drop(guard);
258 :
259 : // Guard is gone, close should finish
260 1 : close_fut.await;
261 :
262 : // Attempting to enter() is still forbidden
263 1 : gate.enter().expect_err("enter should fail finishing close");
264 1 : }
265 :
266 : #[tokio::test(start_paused = true)]
267 1 : async fn clone_gate_guard() {
268 1 : let gate = Gate::default();
269 1 : let forever = Duration::from_secs(24 * 7 * 365);
270 :
271 1 : let guard1 = gate.enter().expect("gate isn't closed");
272 :
273 1 : let guard2 = guard1.try_clone().expect("gate isn't clsoed");
274 :
275 1 : let mut close_fut = std::pin::pin!(gate.close());
276 :
277 1 : tokio::time::timeout(forever, &mut close_fut)
278 1 : .await
279 1 : .unwrap_err();
280 :
281 : // we polled close_fut once, that should prevent all later enters and clones
282 1 : gate.enter().unwrap_err();
283 1 : guard1.try_clone().unwrap_err();
284 1 : guard2.try_clone().unwrap_err();
285 :
286 : // guard2 keeps gate open even if guard1 is closed
287 1 : drop(guard1);
288 1 : tokio::time::timeout(forever, &mut close_fut)
289 1 : .await
290 1 : .unwrap_err();
291 :
292 1 : drop(guard2);
293 :
294 : // now that the last guard is dropped, closing should complete
295 1 : close_fut.await;
296 :
297 : // entering is still forbidden
298 1 : gate.enter().expect_err("enter should stilll fail");
299 1 : }
300 : }
|