TLA Line data Source code
1 : //! Failpoint support code shared between pageserver and safekeepers.
2 :
3 : use crate::http::{
4 : error::ApiError,
5 : json::{json_request, json_response},
6 : };
7 : use hyper::{Body, Request, Response, StatusCode};
8 : use serde::{Deserialize, Serialize};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::*;
11 :
12 : /// use with fail::cfg("$name", "return(2000)")
13 : ///
14 : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
15 : /// specified time (in milliseconds). The main difference is that we use async
16 : /// tokio sleep function. Another difference is that we print lines to the log,
17 : /// which can be useful in tests to check that the failpoint was hit.
18 : #[macro_export]
19 : macro_rules! __failpoint_sleep_millis_async {
20 : ($name:literal) => {{
21 : // If the failpoint is used with a "return" action, set should_sleep to the
22 : // returned value (as string). Otherwise it's set to None.
23 : let should_sleep = (|| {
24 : ::fail::fail_point!($name, |x| x);
25 : ::std::option::Option::None
26 : })();
27 :
28 : // Sleep if the action was a returned value
29 : if let ::std::option::Option::Some(duration_str) = should_sleep {
30 : $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
31 : }
32 : }};
33 : }
34 : pub use __failpoint_sleep_millis_async as sleep_millis_async;
35 :
36 : // Helper function used by the macro. (A function has nicer scoping so we
37 : // don't need to decorate everything with "::")
38 : #[doc(hidden)]
39 CBC 7 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
40 7 : let millis = duration_str.parse::<u64>().unwrap();
41 7 : let d = std::time::Duration::from_millis(millis);
42 :
43 7 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
44 7 : tokio::time::sleep(d).await;
45 7 : tracing::info!("failpoint {:?}: sleep done", name);
46 7 : }
47 :
48 557 : pub fn init() -> fail::FailScenario<'static> {
49 557 : // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
50 557 : // We want non-default behavior for `exit`, though, so, we handle it separately.
51 557 : //
52 557 : // Format for FAILPOINTS is "name=actions" separated by ";".
53 557 : let actions = std::env::var("FAILPOINTS");
54 557 : if actions.is_ok() {
55 12 : std::env::remove_var("FAILPOINTS");
56 545 : } else {
57 545 : // let the library handle non-utf8, or nothing for not present
58 545 : }
59 :
60 557 : let scenario = fail::FailScenario::setup();
61 :
62 557 : if let Ok(val) = actions {
63 12 : val.split(';')
64 12 : .enumerate()
65 12 : .map(|(i, s)| s.split_once('=').ok_or((i, s)))
66 12 : .for_each(|res| {
67 12 : let (name, actions) = match res {
68 12 : Ok(t) => t,
69 UBC 0 : Err((i, s)) => {
70 0 : panic!(
71 0 : "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
72 0 : i + 1,
73 0 : );
74 : }
75 : };
76 CBC 12 : if let Err(e) = apply_failpoint(name, actions) {
77 UBC 0 : panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
78 CBC 12 : }
79 12 : });
80 545 : }
81 :
82 557 : scenario
83 557 : }
84 :
85 193 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
86 193 : if actions == "exit" {
87 10 : fail::cfg_callback(name, exit_failpoint)
88 : } else {
89 183 : fail::cfg(name, actions)
90 : }
91 193 : }
92 :
93 : #[inline(never)]
94 9 : fn exit_failpoint() {
95 9 : tracing::info!("Exit requested by failpoint");
96 9 : std::process::exit(1);
97 : }
98 :
99 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
100 :
101 : /// Information for configuring a single fail point
102 905 : #[derive(Debug, Serialize, Deserialize)]
103 : pub struct FailpointConfig {
104 : /// Name of the fail point
105 : pub name: String,
106 : /// List of actions to take, using the format described in `fail::cfg`
107 : ///
108 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
109 : pub actions: String,
110 : }
111 :
112 : /// Configure failpoints through http.
113 174 : pub async fn failpoints_handler(
114 174 : mut request: Request<Body>,
115 174 : _cancel: CancellationToken,
116 174 : ) -> Result<Response<Body>, ApiError> {
117 174 : if !fail::has_failpoints() {
118 UBC 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
119 0 : "Cannot manage failpoints because storage was compiled without failpoints support"
120 0 : )));
121 CBC 174 : }
122 :
123 174 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
124 355 : for fp in failpoints {
125 181 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
126 :
127 : // We recognize one extra "action" that's not natively recognized
128 : // by the failpoints crate: exit, to immediately kill the process
129 181 : let cfg_result = apply_failpoint(&fp.name, &fp.actions);
130 :
131 181 : if let Err(err_msg) = cfg_result {
132 UBC 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
133 0 : "Failed to configure failpoints: {err_msg}"
134 0 : )));
135 CBC 181 : }
136 : }
137 :
138 174 : json_response(StatusCode::OK, ())
139 174 : }
|