LCOV - code coverage report
Current view: top level - libs/utils/src - failpoint_support.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 14.0 % 93 13
Test Date: 2025-01-30 15:18:43 Functions: 23.2 % 112 26

            Line data    Source code
       1              : //! Failpoint support code shared between pageserver and safekeepers.
       2              : 
       3              : use crate::http::{
       4              :     error::ApiError,
       5              :     json::{json_request, json_response},
       6              : };
       7              : use hyper::{Body, Request, Response, StatusCode};
       8              : use serde::{Deserialize, Serialize};
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::*;
      11              : 
      12              : /// Declare a failpoint that can use to `pause` failpoint action.
      13              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
      14              : ///
      15              : /// Optionally pass a cancellation token, and this failpoint will drop out of
      16              : /// its pause when the cancellation token fires. This is useful for testing
      17              : /// cases where we would like to block something, but test its clean shutdown behavior.
      18              : /// The macro evaluates to a Result in that case, where Ok(()) is the case
      19              : /// where the failpoint was not paused, and Err() is the case where cancellation
      20              : /// token fired while evaluating the failpoint.
      21              : ///
      22              : /// Remember to unpause the failpoint in the test; until that happens, one of the
      23              : /// limited number of spawn_blocking thread pool threads is leaked.
      24              : #[macro_export]
      25              : macro_rules! pausable_failpoint {
      26              :     ($name:literal) => {{
      27              :         if cfg!(feature = "testing") {
      28              :             let cancel = ::tokio_util::sync::CancellationToken::new();
      29              :             let _ = $crate::pausable_failpoint!($name, &cancel);
      30              :         }
      31              :     }};
      32              :     ($name:literal, $cancel:expr) => {{
      33              :         if cfg!(feature = "testing") {
      34              :             let failpoint_fut = ::tokio::task::spawn_blocking({
      35              :                 let current = ::tracing::Span::current();
      36        12591 :                 move || {
      37        12591 :                     let _entered = current.entered();
      38        12591 :                     ::tracing::info!("at failpoint {}", $name);
      39        12591 :                     ::fail::fail_point!($name);
      40        12591 :                 }
      41              :             });
      42        12205 :             let cancel_fut = async move {
      43        12205 :                 $cancel.cancelled().await;
      44              :             };
      45              :             ::tokio::select! {
      46              :                 res = failpoint_fut => {
      47              :                     res.expect("spawn_blocking");
      48              :                     // continue with execution
      49              :                     Ok(())
      50              :                 },
      51              :                 _ = cancel_fut => {
      52              :                     Err(())
      53              :                 }
      54              :             }
      55              :         } else {
      56              :             Ok(())
      57              :         }
      58              :     }};
      59              : }
      60              : 
      61              : pub use pausable_failpoint;
      62              : 
      63              : /// use with fail::cfg("$name", "return(2000)")
      64              : ///
      65              : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
      66              : /// specified time (in milliseconds). The main difference is that we use async
      67              : /// tokio sleep function. Another difference is that we print lines to the log,
      68              : /// which can be useful in tests to check that the failpoint was hit.
      69              : ///
      70              : /// Optionally pass a cancellation token, and this failpoint will drop out of
      71              : /// its sleep when the cancellation token fires.  This is useful for testing
      72              : /// cases where we would like to block something, but test its clean shutdown behavior.
      73              : #[macro_export]
      74              : macro_rules! __failpoint_sleep_millis_async {
      75              :     ($name:literal) => {{
      76              :         // If the failpoint is used with a "return" action, set should_sleep to the
      77              :         // returned value (as string). Otherwise it's set to None.
      78       292152 :         let should_sleep = (|| {
      79       292152 :             ::fail::fail_point!($name, |x| x);
      80       292152 :             ::std::option::Option::None
      81              :         })();
      82              : 
      83              :         // Sleep if the action was a returned value
      84              :         if let ::std::option::Option::Some(duration_str) = should_sleep {
      85              :             $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
      86              :         }
      87              :     }};
      88              :     ($name:literal, $cancel:expr) => {{
      89              :         // If the failpoint is used with a "return" action, set should_sleep to the
      90              :         // returned value (as string). Otherwise it's set to None.
      91          440 :         let should_sleep = (|| {
      92          440 :             ::fail::fail_point!($name, |x| x);
      93          440 :             ::std::option::Option::None
      94              :         })();
      95              : 
      96              :         // Sleep if the action was a returned value
      97              :         if let ::std::option::Option::Some(duration_str) = should_sleep {
      98              :             $crate::failpoint_support::failpoint_sleep_cancellable_helper(
      99              :                 $name,
     100              :                 duration_str,
     101              :                 $cancel,
     102              :             )
     103              :             .await
     104              :         }
     105              :     }};
     106              : }
     107              : pub use __failpoint_sleep_millis_async as sleep_millis_async;
     108              : 
     109              : // Helper function used by the macro. (A function has nicer scoping so we
     110              : // don't need to decorate everything with "::")
     111              : #[doc(hidden)]
     112            0 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
     113            0 :     let millis = duration_str.parse::<u64>().unwrap();
     114            0 :     let d = std::time::Duration::from_millis(millis);
     115            0 : 
     116            0 :     tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
     117            0 :     tokio::time::sleep(d).await;
     118            0 :     tracing::info!("failpoint {:?}: sleep done", name);
     119            0 : }
     120              : 
     121              : // Helper function used by the macro. (A function has nicer scoping so we
     122              : // don't need to decorate everything with "::")
     123              : #[doc(hidden)]
     124            0 : pub async fn failpoint_sleep_cancellable_helper(
     125            0 :     name: &'static str,
     126            0 :     duration_str: String,
     127            0 :     cancel: &CancellationToken,
     128            0 : ) {
     129            0 :     let millis = duration_str.parse::<u64>().unwrap();
     130            0 :     let d = std::time::Duration::from_millis(millis);
     131            0 : 
     132            0 :     tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
     133            0 :     tokio::time::timeout(d, cancel.cancelled()).await.ok();
     134            0 :     tracing::info!("failpoint {:?}: sleep done", name);
     135            0 : }
     136              : 
     137            0 : pub fn init() -> fail::FailScenario<'static> {
     138            0 :     // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
     139            0 :     // We want non-default behavior for `exit`, though, so, we handle it separately.
     140            0 :     //
     141            0 :     // Format for FAILPOINTS is "name=actions" separated by ";".
     142            0 :     let actions = std::env::var("FAILPOINTS");
     143            0 :     if actions.is_ok() {
     144            0 :         std::env::remove_var("FAILPOINTS");
     145            0 :     } else {
     146            0 :         // let the library handle non-utf8, or nothing for not present
     147            0 :     }
     148              : 
     149            0 :     let scenario = fail::FailScenario::setup();
     150              : 
     151            0 :     if let Ok(val) = actions {
     152            0 :         val.split(';')
     153            0 :             .enumerate()
     154            0 :             .map(|(i, s)| s.split_once('=').ok_or((i, s)))
     155            0 :             .for_each(|res| {
     156            0 :                 let (name, actions) = match res {
     157            0 :                     Ok(t) => t,
     158            0 :                     Err((i, s)) => {
     159            0 :                         panic!(
     160            0 :                             "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
     161            0 :                             i + 1,
     162            0 :                         );
     163              :                     }
     164              :                 };
     165            0 :                 if let Err(e) = apply_failpoint(name, actions) {
     166            0 :                     panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
     167            0 :                 }
     168            0 :             });
     169            0 :     }
     170              : 
     171            0 :     scenario
     172            0 : }
     173              : 
     174            0 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
     175            0 :     if actions == "exit" {
     176            0 :         fail::cfg_callback(name, exit_failpoint)
     177              :     } else {
     178            0 :         fail::cfg(name, actions)
     179              :     }
     180            0 : }
     181              : 
     182              : #[inline(never)]
     183            0 : fn exit_failpoint() {
     184            0 :     tracing::info!("Exit requested by failpoint");
     185            0 :     std::process::exit(1);
     186              : }
     187              : 
     188              : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
     189              : 
     190              : /// Information for configuring a single fail point
     191            0 : #[derive(Debug, Serialize, Deserialize)]
     192              : pub struct FailpointConfig {
     193              :     /// Name of the fail point
     194              :     pub name: String,
     195              :     /// List of actions to take, using the format described in `fail::cfg`
     196              :     ///
     197              :     /// We also support `actions = "exit"` to cause the fail point to immediately exit.
     198              :     pub actions: String,
     199              : }
     200              : 
     201              : /// Configure failpoints through http.
     202            0 : pub async fn failpoints_handler(
     203            0 :     mut request: Request<Body>,
     204            0 :     _cancel: CancellationToken,
     205            0 : ) -> Result<Response<Body>, ApiError> {
     206            0 :     if !fail::has_failpoints() {
     207            0 :         return Err(ApiError::BadRequest(anyhow::anyhow!(
     208            0 :             "Cannot manage failpoints because neon was compiled without failpoints support"
     209            0 :         )));
     210            0 :     }
     211              : 
     212            0 :     let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
     213            0 :     for fp in failpoints {
     214            0 :         info!("cfg failpoint: {} {}", fp.name, fp.actions);
     215              : 
     216              :         // We recognize one extra "action" that's not natively recognized
     217              :         // by the failpoints crate: exit, to immediately kill the process
     218            0 :         let cfg_result = apply_failpoint(&fp.name, &fp.actions);
     219              : 
     220            0 :         if let Err(err_msg) = cfg_result {
     221            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
     222            0 :                 "Failed to configure failpoints: {err_msg}"
     223            0 :             )));
     224            0 :         }
     225              :     }
     226              : 
     227            0 :     json_response(StatusCode::OK, ())
     228            0 : }
        

Generated by: LCOV version 2.1-beta