LCOV - code coverage report
Current view: top level - libs/utils/src/sync - gate.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 72.8 % 125 91
Test Date: 2025-07-16 12:29:03 Functions: 78.3 % 23 18

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::sync::atomic::{AtomicBool, Ordering};
       3              : use std::time::Duration;
       4              : 
       5              : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
       6              : ///
       7              : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
       8              : /// the resource calls `close()` when they want to ensure that all holders of guards
       9              : /// have released them, and that no future guards will be issued.
      10              : pub struct Gate {
      11              :     inner: Arc<GateInner>,
      12              : }
      13              : 
      14              : impl std::fmt::Debug for Gate {
      15            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      16            0 :         f.debug_struct("Gate")
      17            0 :             // use this for identification
      18            0 :             .field("ptr", &Arc::as_ptr(&self.inner))
      19            0 :             .field("inner", &self.inner)
      20            0 :             .finish()
      21            0 :     }
      22              : }
      23              : 
      24              : struct GateInner {
      25              :     sem: tokio::sync::Semaphore,
      26              :     closing: std::sync::atomic::AtomicBool,
      27              : }
      28              : 
      29              : impl std::fmt::Debug for GateInner {
      30            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      31            0 :         let avail = self.sem.available_permits();
      32              : 
      33            0 :         let guards = u32::try_from(avail)
      34            0 :             .ok()
      35              :             // the sem only supports 32-bit ish amount, but lets play it safe
      36            0 :             .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
      37              : 
      38            0 :         let closing = self.closing.load(Ordering::Relaxed);
      39              : 
      40            0 :         if let Some(guards) = guards {
      41            0 :             f.debug_struct("Gate")
      42            0 :                 .field("remaining_guards", &guards)
      43            0 :                 .field("closing", &closing)
      44            0 :                 .finish()
      45              :         } else {
      46            0 :             f.debug_struct("Gate")
      47            0 :                 .field("avail_permits", &avail)
      48            0 :                 .field("closing", &closing)
      49            0 :                 .finish()
      50              :         }
      51            0 :     }
      52              : }
      53              : 
      54              : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
      55              : /// not complete.
      56              : #[derive(Debug)]
      57              : pub struct GateGuard {
      58              :     // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
      59              :     span_at_enter: tracing::Span,
      60              :     gate: Arc<GateInner>,
      61              : }
      62              : 
      63              : impl GateGuard {
      64            3 :     pub fn try_clone(&self) -> Result<Self, GateError> {
      65            3 :         Gate::enter_impl(self.gate.clone())
      66            3 :     }
      67              : }
      68              : 
      69              : impl Drop for GateGuard {
      70         4588 :     fn drop(&mut self) {
      71         4588 :         if self.gate.closing.load(Ordering::Relaxed) {
      72           18 :             self.span_at_enter.in_scope(
      73            0 :                 || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
      74              :             );
      75         4570 :         }
      76              : 
      77              :         // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
      78              :         // manually, so "return" the permit now.
      79         4588 :         self.gate.sem.add_permits(1);
      80         4588 :     }
      81              : }
      82              : 
      83              : #[derive(Debug, thiserror::Error)]
      84              : pub enum GateError {
      85              :     #[error("gate is closed")]
      86              :     GateClosed,
      87              : }
      88              : 
      89              : impl GateError {
      90            0 :     pub fn is_cancel(&self) -> bool {
      91            0 :         match self {
      92            0 :             GateError::GateClosed => true,
      93              :         }
      94            0 :     }
      95              : }
      96              : 
      97              : impl Default for Gate {
      98          415 :     fn default() -> Self {
      99          415 :         Self {
     100          415 :             inner: Arc::new(GateInner {
     101          415 :                 sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
     102          415 :                 closing: AtomicBool::new(false),
     103          415 :             }),
     104          415 :         }
     105          415 :     }
     106              : }
     107              : 
     108              : impl Gate {
     109              :     const MAX_UNITS: u32 = u32::MAX;
     110              : 
     111              :     /// Acquire a guard that will prevent close() calls from completing. If close()
     112              :     /// was already called, this will return an error which should be interpreted
     113              :     /// as "shutting down".
     114              :     ///
     115              :     /// This function would typically be used from e.g. request handlers. While holding
     116              :     /// the guard returned from this function, it is important to respect a CancellationToken
     117              :     /// to avoid blocking close() indefinitely: typically types that contain a Gate will
     118              :     /// also contain a CancellationToken.
     119         4897 :     pub fn enter(&self) -> Result<GateGuard, GateError> {
     120         4897 :         Self::enter_impl(self.inner.clone())
     121         4897 :     }
     122              : 
     123         4900 :     fn enter_impl(gate: Arc<GateInner>) -> Result<GateGuard, GateError> {
     124         4900 :         let permit = gate.sem.try_acquire().map_err(|_| GateError::GateClosed)?;
     125              : 
     126              :         // we now have the permit, let's disable the normal raii functionality and leave
     127              :         // "returning" the permit to our GateGuard::drop.
     128              :         //
     129              :         // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
     130              :         // fields).
     131         4883 :         permit.forget();
     132              : 
     133         4883 :         Ok(GateGuard {
     134         4883 :             span_at_enter: tracing::Span::current(),
     135         4883 :             gate,
     136         4883 :         })
     137         4900 :     }
     138              : 
     139              :     /// Types with a shutdown() method and a gate should call this method at the
     140              :     /// end of shutdown, to ensure that all GateGuard holders are done.
     141              :     ///
     142              :     /// This will wait for all guards to be destroyed.  For this to complete promptly, it is
     143              :     /// important that the holders of such guards are respecting a CancellationToken which has
     144              :     /// been cancelled before entering this function.
     145           21 :     pub async fn close(&self) {
     146           21 :         let started_at = std::time::Instant::now();
     147           21 :         let mut do_close = std::pin::pin!(self.do_close());
     148              : 
     149              :         // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
     150           21 :         let nag_after = Duration::from_millis(100);
     151              : 
     152           21 :         let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
     153           14 :             return;
     154              :         };
     155              : 
     156            7 :         tracing::info!(
     157            0 :             gate = ?self.as_ptr(),
     158            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     159            0 :             "closing is taking longer than expected"
     160              :         );
     161              : 
     162              :         // close operation is not trying to be cancellation safe as pageserver does not need it.
     163              :         //
     164              :         // note: "closing" is not checked in Gate::enter -- it exists just for observability,
     165              :         // dropping of GateGuard after this will log who they were.
     166            7 :         self.inner.closing.store(true, Ordering::Relaxed);
     167              : 
     168            7 :         do_close.await;
     169              : 
     170            3 :         tracing::info!(
     171            0 :             gate = ?self.as_ptr(),
     172            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     173            0 :             "close completed"
     174              :         );
     175           17 :     }
     176              : 
     177              :     /// Used as an identity of a gate. This identity will be resolved to something useful when
     178              :     /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
     179              :     /// more.
     180              :     ///
     181              :     /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
     182              :     /// open for too long.
     183            9 :     fn as_ptr(&self) -> *const GateInner {
     184            9 :         Arc::as_ptr(&self.inner)
     185            9 :     }
     186              : 
     187              :     /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish.  This
     188              :     /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
     189              :     /// the CancellationToken on such types is analogous to "Did shutdown start?"
     190            1 :     pub fn close_complete(&self) -> bool {
     191            1 :         self.inner.sem.is_closed()
     192            1 :     }
     193              : 
     194              :     #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
     195              :     async fn do_close(&self) {
     196              :         tracing::debug!("Closing Gate...");
     197              : 
     198              :         match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
     199              :             Ok(_permit) => {
     200              :                 // While holding all units, close the semaphore.  All subsequent calls to enter() will fail.
     201              :                 self.inner.sem.close();
     202              :             }
     203              :             Err(_closed) => {
     204              :                 // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
     205              :                 // This is legal.  Timeline::shutdown for example is not protected from being called more than
     206              :                 // once.
     207              :                 tracing::debug!("Double close")
     208              :             }
     209              :         }
     210              :         tracing::debug!("Closed Gate.")
     211              :     }
     212              : }
     213              : 
     214              : #[cfg(test)]
     215              : mod tests {
     216              :     use super::*;
     217              : 
     218              :     #[tokio::test]
     219            1 :     async fn close_unused() {
     220              :         // Having taken no guards, we should not be blocked in close
     221            1 :         let gate = Gate::default();
     222            1 :         gate.close().await;
     223            1 :     }
     224              : 
     225              :     #[tokio::test]
     226            1 :     async fn close_idle() {
     227              :         // If a guard is dropped before entering, close should not be blocked
     228            1 :         let gate = Gate::default();
     229            1 :         let guard = gate.enter().unwrap();
     230            1 :         drop(guard);
     231            1 :         gate.close().await;
     232              : 
     233              :         // Entering a closed guard fails
     234            1 :         gate.enter().expect_err("enter should fail after close");
     235            1 :     }
     236              : 
     237              :     #[tokio::test(start_paused = true)]
     238            1 :     async fn close_busy_gate() {
     239            1 :         let gate = Gate::default();
     240            1 :         let forever = Duration::from_secs(24 * 7 * 365);
     241              : 
     242            1 :         let guard =
     243            1 :             tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
     244              : 
     245            1 :         let mut close_fut = std::pin::pin!(gate.close());
     246              : 
     247              :         // Close should be waiting for guards to drop
     248            1 :         tokio::time::timeout(forever, &mut close_fut)
     249            1 :             .await
     250            1 :             .unwrap_err();
     251              : 
     252              :         // Attempting to enter() should fail, even though close isn't done yet.
     253            1 :         gate.enter()
     254            1 :             .expect_err("enter should fail after entering close");
     255              : 
     256              :         // this will now log, which we cannot verify except manually
     257            1 :         drop(guard);
     258              : 
     259              :         // Guard is gone, close should finish
     260            1 :         close_fut.await;
     261              : 
     262              :         // Attempting to enter() is still forbidden
     263            1 :         gate.enter().expect_err("enter should fail finishing close");
     264            1 :     }
     265              : 
     266              :     #[tokio::test(start_paused = true)]
     267            1 :     async fn clone_gate_guard() {
     268            1 :         let gate = Gate::default();
     269            1 :         let forever = Duration::from_secs(24 * 7 * 365);
     270              : 
     271            1 :         let guard1 = gate.enter().expect("gate isn't closed");
     272              : 
     273            1 :         let guard2 = guard1.try_clone().expect("gate isn't clsoed");
     274              : 
     275            1 :         let mut close_fut = std::pin::pin!(gate.close());
     276              : 
     277            1 :         tokio::time::timeout(forever, &mut close_fut)
     278            1 :             .await
     279            1 :             .unwrap_err();
     280              : 
     281              :         // we polled close_fut once, that should prevent all later enters and clones
     282            1 :         gate.enter().unwrap_err();
     283            1 :         guard1.try_clone().unwrap_err();
     284            1 :         guard2.try_clone().unwrap_err();
     285              : 
     286              :         // guard2 keeps gate open even if guard1 is closed
     287            1 :         drop(guard1);
     288            1 :         tokio::time::timeout(forever, &mut close_fut)
     289            1 :             .await
     290            1 :             .unwrap_err();
     291              : 
     292            1 :         drop(guard2);
     293              : 
     294              :         // now that the last guard is dropped, closing should complete
     295            1 :         close_fut.await;
     296              : 
     297              :         // entering is still forbidden
     298            1 :         gate.enter().expect_err("enter should stilll fail");
     299            1 :     }
     300              : }
        

Generated by: LCOV version 2.1-beta