LCOV - code coverage report
Current view: top level - libs/utils/src/sync - gate.rs (source / functions) Coverage Total Hit
Test: 75747cdbffeb0b6d2a2a311584368de68cd9aadc.info Lines: 74.2 % 124 92
Test Date: 2024-06-24 06:52:57 Functions: 85.7 % 21 18

            Line data    Source code
       1              : use std::{
       2              :     sync::{
       3              :         atomic::{AtomicBool, Ordering},
       4              :         Arc,
       5              :     },
       6              :     time::Duration,
       7              : };
       8              : 
       9              : /// Gates are a concurrency helper, primarily used for implementing safe shutdown.
      10              : ///
      11              : /// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
      12              : /// the resource calls `close()` when they want to ensure that all holders of guards
      13              : /// have released them, and that no future guards will be issued.
      14              : pub struct Gate {
      15              :     inner: Arc<GateInner>,
      16              : }
      17              : 
      18              : impl std::fmt::Debug for Gate {
      19            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      20            0 :         f.debug_struct("Gate")
      21            0 :             // use this for identification
      22            0 :             .field("ptr", &Arc::as_ptr(&self.inner))
      23            0 :             .field("inner", &self.inner)
      24            0 :             .finish()
      25            0 :     }
      26              : }
      27              : 
      28              : struct GateInner {
      29              :     sem: tokio::sync::Semaphore,
      30              :     closing: std::sync::atomic::AtomicBool,
      31              : }
      32              : 
      33              : impl std::fmt::Debug for GateInner {
      34            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      35            0 :         let avail = self.sem.available_permits();
      36            0 : 
      37            0 :         let guards = u32::try_from(avail)
      38            0 :             .ok()
      39            0 :             // the sem only supports 32-bit ish amount, but lets play it safe
      40            0 :             .and_then(|x| Gate::MAX_UNITS.checked_sub(x));
      41            0 : 
      42            0 :         let closing = self.closing.load(Ordering::Relaxed);
      43              : 
      44            0 :         if let Some(guards) = guards {
      45            0 :             f.debug_struct("Gate")
      46            0 :                 .field("remaining_guards", &guards)
      47            0 :                 .field("closing", &closing)
      48            0 :                 .finish()
      49              :         } else {
      50            0 :             f.debug_struct("Gate")
      51            0 :                 .field("avail_permits", &avail)
      52            0 :                 .field("closing", &closing)
      53            0 :                 .finish()
      54              :         }
      55            0 :     }
      56              : }
      57              : 
      58              : /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
      59              : /// not complete.
      60              : #[derive(Debug)]
      61              : pub struct GateGuard {
      62              :     // Record the span where the gate was entered, so that we can identify who was blocking Gate::close
      63              :     span_at_enter: tracing::Span,
      64              :     gate: Arc<GateInner>,
      65              : }
      66              : 
      67              : impl Drop for GateGuard {
      68          605 :     fn drop(&mut self) {
      69          605 :         if self.gate.closing.load(Ordering::Relaxed) {
      70            2 :             self.span_at_enter.in_scope(
      71            2 :                 || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
      72            2 :             );
      73          603 :         }
      74              : 
      75              :         // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
      76              :         // manually, so "return" the permit now.
      77          605 :         self.gate.sem.add_permits(1);
      78          605 :     }
      79              : }
      80              : 
      81              : #[derive(Debug)]
      82              : pub enum GateError {
      83              :     GateClosed,
      84              : }
      85              : 
      86              : impl Default for Gate {
      87          544 :     fn default() -> Self {
      88          544 :         Self {
      89          544 :             inner: Arc::new(GateInner {
      90          544 :                 sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
      91          544 :                 closing: AtomicBool::new(false),
      92          544 :             }),
      93          544 :         }
      94          544 :     }
      95              : }
      96              : 
      97              : impl Gate {
      98              :     const MAX_UNITS: u32 = u32::MAX;
      99              : 
     100              :     /// Acquire a guard that will prevent close() calls from completing. If close()
     101              :     /// was already called, this will return an error which should be interpreted
     102              :     /// as "shutting down".
     103              :     ///
     104              :     /// This function would typically be used from e.g. request handlers. While holding
     105              :     /// the guard returned from this function, it is important to respect a CancellationToken
     106              :     /// to avoid blocking close() indefinitely: typically types that contain a Gate will
     107              :     /// also contain a CancellationToken.
     108          975 :     pub fn enter(&self) -> Result<GateGuard, GateError> {
     109          975 :         let permit = self
     110          975 :             .inner
     111          975 :             .sem
     112          975 :             .try_acquire()
     113          975 :             .map_err(|_| GateError::GateClosed)?;
     114              : 
     115              :         // we now have the permit, let's disable the normal raii functionality and leave
     116              :         // "returning" the permit to our GateGuard::drop.
     117              :         //
     118              :         // this is done to avoid the need for multiple Arcs (one for semaphore, next for other
     119              :         // fields).
     120          969 :         permit.forget();
     121          969 : 
     122          969 :         Ok(GateGuard {
     123          969 :             span_at_enter: tracing::Span::current(),
     124          969 :             gate: self.inner.clone(),
     125          969 :         })
     126          975 :     }
     127              : 
     128              :     /// Types with a shutdown() method and a gate should call this method at the
     129              :     /// end of shutdown, to ensure that all GateGuard holders are done.
     130              :     ///
     131              :     /// This will wait for all guards to be destroyed.  For this to complete promptly, it is
     132              :     /// important that the holders of such guards are respecting a CancellationToken which has
     133              :     /// been cancelled before entering this function.
     134           20 :     pub async fn close(&self) {
     135           20 :         let started_at = std::time::Instant::now();
     136           20 :         let mut do_close = std::pin::pin!(self.do_close());
     137           20 : 
     138           20 :         // with 1s we rarely saw anything, let's try if we get more gate closing reasons with 100ms
     139           20 :         let nag_after = Duration::from_millis(100);
     140              : 
     141           20 :         let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
     142           18 :             return;
     143              :         };
     144              : 
     145            2 :         tracing::info!(
     146            0 :             gate = ?self.as_ptr(),
     147            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     148            0 :             "closing is taking longer than expected"
     149              :         );
     150              : 
     151              :         // close operation is not trying to be cancellation safe as pageserver does not need it.
     152              :         //
     153              :         // note: "closing" is not checked in Gate::enter -- it exists just for observability,
     154              :         // dropping of GateGuard after this will log who they were.
     155            2 :         self.inner.closing.store(true, Ordering::Relaxed);
     156            2 : 
     157            4 :         do_close.await;
     158              : 
     159            2 :         tracing::info!(
     160            0 :             gate = ?self.as_ptr(),
     161            0 :             elapsed_ms = started_at.elapsed().as_millis(),
     162            0 :             "close completed"
     163              :         );
     164           20 :     }
     165              : 
     166              :     /// Used as an identity of a gate. This identity will be resolved to something useful when
     167              :     /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
     168              :     /// more.
     169              :     ///
     170              :     /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
     171              :     /// open for too long.
     172            6 :     fn as_ptr(&self) -> *const GateInner {
     173            6 :         Arc::as_ptr(&self.inner)
     174            6 :     }
     175              : 
     176              :     /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish.  This
     177              :     /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
     178              :     /// the CancellationToken on such types is analogous to "Did shutdown start?"
     179            2 :     pub fn close_complete(&self) -> bool {
     180            2 :         self.inner.sem.is_closed()
     181            2 :     }
     182              : 
     183           40 :     #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
     184              :     async fn do_close(&self) {
     185              :         tracing::debug!("Closing Gate...");
     186              : 
     187              :         match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
     188              :             Ok(_permit) => {
     189              :                 // While holding all units, close the semaphore.  All subsequent calls to enter() will fail.
     190              :                 self.inner.sem.close();
     191              :             }
     192              :             Err(_closed) => {
     193              :                 // Semaphore closed: we are the only function that can do this, so it indicates a double-call.
     194              :                 // This is legal.  Timeline::shutdown for example is not protected from being called more than
     195              :                 // once.
     196              :                 tracing::debug!("Double close")
     197              :             }
     198              :         }
     199              :         tracing::debug!("Closed Gate.")
     200              :     }
     201              : }
     202              : 
     203              : #[cfg(test)]
     204              : mod tests {
     205              :     use super::*;
     206              : 
     207              :     #[tokio::test]
     208            2 :     async fn close_unused() {
     209            2 :         // Having taken no guards, we should not be blocked in close
     210            2 :         let gate = Gate::default();
     211            2 :         gate.close().await;
     212            2 :     }
     213              : 
     214              :     #[tokio::test]
     215            2 :     async fn close_idle() {
     216            2 :         // If a guard is dropped before entering, close should not be blocked
     217            2 :         let gate = Gate::default();
     218            2 :         let guard = gate.enter().unwrap();
     219            2 :         drop(guard);
     220            2 :         gate.close().await;
     221            2 : 
     222            2 :         // Entering a closed guard fails
     223            2 :         gate.enter().expect_err("enter should fail after close");
     224            2 :     }
     225              : 
     226              :     #[tokio::test(start_paused = true)]
     227            2 :     async fn close_busy_gate() {
     228            2 :         let gate = Gate::default();
     229            2 :         let forever = Duration::from_secs(24 * 7 * 365);
     230            2 : 
     231            2 :         let guard =
     232            2 :             tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
     233            2 : 
     234            2 :         let mut close_fut = std::pin::pin!(gate.close());
     235            2 : 
     236            2 :         // Close should be waiting for guards to drop
     237            2 :         tokio::time::timeout(forever, &mut close_fut)
     238            4 :             .await
     239            2 :             .unwrap_err();
     240            2 : 
     241            2 :         // Attempting to enter() should fail, even though close isn't done yet.
     242            2 :         gate.enter()
     243            2 :             .expect_err("enter should fail after entering close");
     244            2 : 
     245            2 :         // this will now log, which we cannot verify except manually
     246            2 :         drop(guard);
     247            2 : 
     248            2 :         // Guard is gone, close should finish
     249            2 :         close_fut.await;
     250            2 : 
     251            2 :         // Attempting to enter() is still forbidden
     252            2 :         gate.enter().expect_err("enter should fail finishing close");
     253            2 :     }
     254              : }
        

Generated by: LCOV version 2.1-beta