Line data Source code
1 : //! Failpoint support code shared between pageserver and safekeepers.
2 :
3 : use crate::http::{
4 : error::ApiError,
5 : json::{json_request, json_response},
6 : };
7 : use hyper::{Body, Request, Response, StatusCode};
8 : use serde::{Deserialize, Serialize};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::*;
11 :
12 : /// use with fail::cfg("$name", "return(2000)")
13 : ///
14 : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
15 : /// specified time (in milliseconds). The main difference is that we use async
16 : /// tokio sleep function. Another difference is that we print lines to the log,
17 : /// which can be useful in tests to check that the failpoint was hit.
18 : ///
19 : /// Optionally pass a cancellation token, and this failpoint will drop out of
20 : /// its sleep when the cancellation token fires. This is useful for testing
21 : /// cases where we would like to block something, but test its clean shutdown behavior.
22 : #[macro_export]
23 : macro_rules! __failpoint_sleep_millis_async {
24 : ($name:literal) => {{
25 : // If the failpoint is used with a "return" action, set should_sleep to the
26 : // returned value (as string). Otherwise it's set to None.
27 : let should_sleep = (|| {
28 : ::fail::fail_point!($name, |x| x);
29 : ::std::option::Option::None
30 : })();
31 :
32 : // Sleep if the action was a returned value
33 : if let ::std::option::Option::Some(duration_str) = should_sleep {
34 : $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
35 : }
36 : }};
37 : ($name:literal, $cancel:expr) => {{
38 : // If the failpoint is used with a "return" action, set should_sleep to the
39 : // returned value (as string). Otherwise it's set to None.
40 : let should_sleep = (|| {
41 : ::fail::fail_point!($name, |x| x);
42 : ::std::option::Option::None
43 : })();
44 :
45 : // Sleep if the action was a returned value
46 : if let ::std::option::Option::Some(duration_str) = should_sleep {
47 : $crate::failpoint_support::failpoint_sleep_cancellable_helper(
48 : $name,
49 : duration_str,
50 : $cancel,
51 : )
52 : .await
53 : }
54 : }};
55 : }
56 : pub use __failpoint_sleep_millis_async as sleep_millis_async;
57 :
58 : // Helper function used by the macro. (A function has nicer scoping so we
59 : // don't need to decorate everything with "::")
60 : #[doc(hidden)]
61 4 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
62 4 : let millis = duration_str.parse::<u64>().unwrap();
63 4 : let d = std::time::Duration::from_millis(millis);
64 :
65 4 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
66 4 : tokio::time::sleep(d).await;
67 4 : tracing::info!("failpoint {:?}: sleep done", name);
68 4 : }
69 :
70 : // Helper function used by the macro. (A function has nicer scoping so we
71 : // don't need to decorate everything with "::")
72 : #[doc(hidden)]
73 3 : pub async fn failpoint_sleep_cancellable_helper(
74 3 : name: &'static str,
75 3 : duration_str: String,
76 3 : cancel: &CancellationToken,
77 3 : ) {
78 3 : let millis = duration_str.parse::<u64>().unwrap();
79 3 : let d = std::time::Duration::from_millis(millis);
80 :
81 3 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
82 3 : tokio::time::timeout(d, cancel.cancelled()).await.ok();
83 3 : tracing::info!("failpoint {:?}: sleep done", name);
84 3 : }
85 :
86 604 : pub fn init() -> fail::FailScenario<'static> {
87 604 : // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
88 604 : // We want non-default behavior for `exit`, though, so, we handle it separately.
89 604 : //
90 604 : // Format for FAILPOINTS is "name=actions" separated by ";".
91 604 : let actions = std::env::var("FAILPOINTS");
92 604 : if actions.is_ok() {
93 8 : std::env::remove_var("FAILPOINTS");
94 596 : } else {
95 596 : // let the library handle non-utf8, or nothing for not present
96 596 : }
97 :
98 604 : let scenario = fail::FailScenario::setup();
99 :
100 604 : if let Ok(val) = actions {
101 8 : val.split(';')
102 8 : .enumerate()
103 9 : .map(|(i, s)| s.split_once('=').ok_or((i, s)))
104 9 : .for_each(|res| {
105 9 : let (name, actions) = match res {
106 9 : Ok(t) => t,
107 0 : Err((i, s)) => {
108 0 : panic!(
109 0 : "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
110 0 : i + 1,
111 0 : );
112 : }
113 : };
114 9 : if let Err(e) = apply_failpoint(name, actions) {
115 0 : panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
116 9 : }
117 9 : });
118 596 : }
119 :
120 604 : scenario
121 604 : }
122 :
123 197 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
124 197 : if actions == "exit" {
125 4 : fail::cfg_callback(name, exit_failpoint)
126 : } else {
127 193 : fail::cfg(name, actions)
128 : }
129 197 : }
130 :
131 : #[inline(never)]
132 4 : fn exit_failpoint() {
133 4 : tracing::info!("Exit requested by failpoint");
134 4 : std::process::exit(1);
135 : }
136 :
137 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
138 :
139 : /// Information for configuring a single fail point
140 940 : #[derive(Debug, Serialize, Deserialize)]
141 : pub struct FailpointConfig {
142 : /// Name of the fail point
143 : pub name: String,
144 : /// List of actions to take, using the format described in `fail::cfg`
145 : ///
146 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
147 : pub actions: String,
148 : }
149 :
150 : /// Configure failpoints through http.
151 180 : pub async fn failpoints_handler(
152 180 : mut request: Request<Body>,
153 180 : _cancel: CancellationToken,
154 180 : ) -> Result<Response<Body>, ApiError> {
155 180 : if !fail::has_failpoints() {
156 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
157 0 : "Cannot manage failpoints because storage was compiled without failpoints support"
158 0 : )));
159 180 : }
160 :
161 180 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
162 368 : for fp in failpoints {
163 188 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
164 :
165 : // We recognize one extra "action" that's not natively recognized
166 : // by the failpoints crate: exit, to immediately kill the process
167 188 : let cfg_result = apply_failpoint(&fp.name, &fp.actions);
168 :
169 188 : if let Err(err_msg) = cfg_result {
170 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
171 0 : "Failed to configure failpoints: {err_msg}"
172 0 : )));
173 188 : }
174 : }
175 :
176 180 : json_response(StatusCode::OK, ())
177 180 : }
|