LCOV - code coverage report
Current view: top level - libs/utils/src - failpoint_support.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 17.6 % 74 13
Test Date: 2025-03-12 00:01:28 Functions: 25.2 % 103 26

            Line data    Source code
       1              : //! Failpoint support code shared between pageserver and safekeepers.
       2              : 
       3              : use tokio_util::sync::CancellationToken;
       4              : 
       5              : /// Declare a failpoint that can use to `pause` failpoint action.
       6              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
       7              : ///
       8              : /// Optionally pass a cancellation token, and this failpoint will drop out of
       9              : /// its pause when the cancellation token fires. This is useful for testing
      10              : /// cases where we would like to block something, but test its clean shutdown behavior.
      11              : /// The macro evaluates to a Result in that case, where Ok(()) is the case
      12              : /// where the failpoint was not paused, and Err() is the case where cancellation
      13              : /// token fired while evaluating the failpoint.
      14              : ///
      15              : /// Remember to unpause the failpoint in the test; until that happens, one of the
      16              : /// limited number of spawn_blocking thread pool threads is leaked.
      17              : #[macro_export]
      18              : macro_rules! pausable_failpoint {
      19              :     ($name:literal) => {{
      20              :         if cfg!(feature = "testing") {
      21              :             let cancel = ::tokio_util::sync::CancellationToken::new();
      22              :             let _ = $crate::pausable_failpoint!($name, &cancel);
      23              :         }
      24              :     }};
      25              :     ($name:literal, $cancel:expr) => {{
      26              :         if cfg!(feature = "testing") {
      27              :             let failpoint_fut = ::tokio::task::spawn_blocking({
      28              :                 let current = ::tracing::Span::current();
      29        12718 :                 move || {
      30        12718 :                     let _entered = current.entered();
      31        12718 :                     ::tracing::info!("at failpoint {}", $name);
      32        12718 :                     ::fail::fail_point!($name);
      33        12718 :                 }
      34              :             });
      35        12222 :             let cancel_fut = async move {
      36        12222 :                 $cancel.cancelled().await;
      37              :             };
      38              :             ::tokio::select! {
      39              :                 res = failpoint_fut => {
      40              :                     res.expect("spawn_blocking");
      41              :                     // continue with execution
      42              :                     Ok(())
      43              :                 },
      44              :                 _ = cancel_fut => {
      45              :                     Err(())
      46              :                 }
      47              :             }
      48              :         } else {
      49              :             Ok(())
      50              :         }
      51              :     }};
      52              : }
      53              : 
      54              : pub use pausable_failpoint;
      55              : 
      56              : /// use with fail::cfg("$name", "return(2000)")
      57              : ///
      58              : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
      59              : /// specified time (in milliseconds). The main difference is that we use async
      60              : /// tokio sleep function. Another difference is that we print lines to the log,
      61              : /// which can be useful in tests to check that the failpoint was hit.
      62              : ///
      63              : /// Optionally pass a cancellation token, and this failpoint will drop out of
      64              : /// its sleep when the cancellation token fires.  This is useful for testing
      65              : /// cases where we would like to block something, but test its clean shutdown behavior.
      66              : #[macro_export]
      67              : macro_rules! __failpoint_sleep_millis_async {
      68              :     ($name:literal) => {{
      69              :         // If the failpoint is used with a "return" action, set should_sleep to the
      70              :         // returned value (as string). Otherwise it's set to None.
      71       292164 :         let should_sleep = (|| {
      72       292164 :             ::fail::fail_point!($name, |x| x);
      73       292164 :             ::std::option::Option::None
      74              :         })();
      75              : 
      76              :         // Sleep if the action was a returned value
      77              :         if let ::std::option::Option::Some(duration_str) = should_sleep {
      78              :             $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
      79              :         }
      80              :     }};
      81              :     ($name:literal, $cancel:expr) => {{
      82              :         // If the failpoint is used with a "return" action, set should_sleep to the
      83              :         // returned value (as string). Otherwise it's set to None.
      84          452 :         let should_sleep = (|| {
      85          452 :             ::fail::fail_point!($name, |x| x);
      86          452 :             ::std::option::Option::None
      87              :         })();
      88              : 
      89              :         // Sleep if the action was a returned value
      90              :         if let ::std::option::Option::Some(duration_str) = should_sleep {
      91              :             $crate::failpoint_support::failpoint_sleep_cancellable_helper(
      92              :                 $name,
      93              :                 duration_str,
      94              :                 $cancel,
      95              :             )
      96              :             .await
      97              :         }
      98              :     }};
      99              : }
     100              : pub use __failpoint_sleep_millis_async as sleep_millis_async;
     101              : 
     102              : // Helper function used by the macro. (A function has nicer scoping so we
     103              : // don't need to decorate everything with "::")
     104              : #[doc(hidden)]
     105            0 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
     106            0 :     let millis = duration_str.parse::<u64>().unwrap();
     107            0 :     let d = std::time::Duration::from_millis(millis);
     108            0 : 
     109            0 :     tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
     110            0 :     tokio::time::sleep(d).await;
     111            0 :     tracing::info!("failpoint {:?}: sleep done", name);
     112            0 : }
     113              : 
     114              : // Helper function used by the macro. (A function has nicer scoping so we
     115              : // don't need to decorate everything with "::")
     116              : #[doc(hidden)]
     117            0 : pub async fn failpoint_sleep_cancellable_helper(
     118            0 :     name: &'static str,
     119            0 :     duration_str: String,
     120            0 :     cancel: &CancellationToken,
     121            0 : ) {
     122            0 :     let millis = duration_str.parse::<u64>().unwrap();
     123            0 :     let d = std::time::Duration::from_millis(millis);
     124            0 : 
     125            0 :     tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
     126            0 :     tokio::time::timeout(d, cancel.cancelled()).await.ok();
     127            0 :     tracing::info!("failpoint {:?}: sleep done", name);
     128            0 : }
     129              : 
     130              : /// Initialize the configured failpoints
     131              : ///
     132              : /// You must call this function before any concurrent threads do operations.
     133            0 : pub fn init() -> fail::FailScenario<'static> {
     134            0 :     // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
     135            0 :     // We want non-default behavior for `exit`, though, so, we handle it separately.
     136            0 :     //
     137            0 :     // Format for FAILPOINTS is "name=actions" separated by ";".
     138            0 :     let actions = std::env::var("FAILPOINTS");
     139            0 :     if actions.is_ok() {
     140              :         // SAFETY: this function should before any threads start and access env vars concurrently
     141            0 :         unsafe {
     142            0 :             std::env::remove_var("FAILPOINTS");
     143            0 :         }
     144            0 :     } else {
     145            0 :         // let the library handle non-utf8, or nothing for not present
     146            0 :     }
     147              : 
     148            0 :     let scenario = fail::FailScenario::setup();
     149              : 
     150            0 :     if let Ok(val) = actions {
     151            0 :         val.split(';')
     152            0 :             .enumerate()
     153            0 :             .map(|(i, s)| s.split_once('=').ok_or((i, s)))
     154            0 :             .for_each(|res| {
     155            0 :                 let (name, actions) = match res {
     156            0 :                     Ok(t) => t,
     157            0 :                     Err((i, s)) => {
     158            0 :                         panic!(
     159            0 :                             "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
     160            0 :                             i + 1,
     161            0 :                         );
     162              :                     }
     163              :                 };
     164            0 :                 if let Err(e) = apply_failpoint(name, actions) {
     165            0 :                     panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
     166            0 :                 }
     167            0 :             });
     168            0 :     }
     169              : 
     170            0 :     scenario
     171            0 : }
     172              : 
     173            0 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
     174            0 :     if actions == "exit" {
     175            0 :         fail::cfg_callback(name, exit_failpoint)
     176              :     } else {
     177            0 :         fail::cfg(name, actions)
     178              :     }
     179            0 : }
     180              : 
     181              : #[inline(never)]
     182            0 : fn exit_failpoint() {
     183            0 :     tracing::info!("Exit requested by failpoint");
     184            0 :     std::process::exit(1);
     185              : }
        

Generated by: LCOV version 2.1-beta