Line data Source code
1 : //! Failpoint support code shared between pageserver and safekeepers.
2 :
3 : use crate::http::{
4 : error::ApiError,
5 : json::{json_request, json_response},
6 : };
7 : use hyper::{Body, Request, Response, StatusCode};
8 : use serde::{Deserialize, Serialize};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::*;
11 :
12 : /// Declare a failpoint that can use the `pause` failpoint action.
13 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
14 : #[macro_export]
15 : macro_rules! pausable_failpoint {
16 : ($name:literal) => {
17 : if cfg!(feature = "testing") {
18 : tokio::task::spawn_blocking({
19 : let current = tracing::Span::current();
20 5806 : move || {
21 5806 : let _entered = current.entered();
22 5806 : tracing::info!("at failpoint {}", $name);
23 5806 : fail::fail_point!($name);
24 5806 : }
25 : })
26 : .await
27 : .expect("spawn_blocking");
28 : }
29 : };
30 : ($name:literal, $cond:expr) => {
31 : if cfg!(feature = "testing") {
32 : if $cond {
33 : pausable_failpoint!($name)
34 : }
35 : }
36 : };
37 : }
38 :
39 : /// use with fail::cfg("$name", "return(2000)")
40 : ///
41 : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
42 : /// specified time (in milliseconds). The main difference is that we use async
43 : /// tokio sleep function. Another difference is that we print lines to the log,
44 : /// which can be useful in tests to check that the failpoint was hit.
45 : ///
46 : /// Optionally pass a cancellation token, and this failpoint will drop out of
47 : /// its sleep when the cancellation token fires. This is useful for testing
48 : /// cases where we would like to block something, but test its clean shutdown behavior.
49 : #[macro_export]
50 : macro_rules! __failpoint_sleep_millis_async {
51 : ($name:literal) => {{
52 : // If the failpoint is used with a "return" action, set should_sleep to the
53 : // returned value (as string). Otherwise it's set to None.
54 146048 : let should_sleep = (|| {
55 146048 : ::fail::fail_point!($name, |x| x);
56 146048 : ::std::option::Option::None
57 : })();
58 :
59 : // Sleep if the action was a returned value
60 : if let ::std::option::Option::Some(duration_str) = should_sleep {
61 : $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
62 : }
63 : }};
64 : ($name:literal, $cancel:expr) => {{
65 : // If the failpoint is used with a "return" action, set should_sleep to the
66 : // returned value (as string). Otherwise it's set to None.
67 192 : let should_sleep = (|| {
68 192 : ::fail::fail_point!($name, |x| x);
69 192 : ::std::option::Option::None
70 : })();
71 :
72 : // Sleep if the action was a returned value
73 : if let ::std::option::Option::Some(duration_str) = should_sleep {
74 : $crate::failpoint_support::failpoint_sleep_cancellable_helper(
75 : $name,
76 : duration_str,
77 : $cancel,
78 : )
79 : .await
80 : }
81 : }};
82 : }
83 : pub use __failpoint_sleep_millis_async as sleep_millis_async;
84 :
85 : // Helper function used by the macro. (A function has nicer scoping so we
86 : // don't need to decorate everything with "::")
87 : #[doc(hidden)]
88 0 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
89 0 : let millis = duration_str.parse::<u64>().unwrap();
90 0 : let d = std::time::Duration::from_millis(millis);
91 0 :
92 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
93 0 : tokio::time::sleep(d).await;
94 0 : tracing::info!("failpoint {:?}: sleep done", name);
95 0 : }
96 :
97 : // Helper function used by the macro. (A function has nicer scoping so we
98 : // don't need to decorate everything with "::")
99 : #[doc(hidden)]
100 0 : pub async fn failpoint_sleep_cancellable_helper(
101 0 : name: &'static str,
102 0 : duration_str: String,
103 0 : cancel: &CancellationToken,
104 0 : ) {
105 0 : let millis = duration_str.parse::<u64>().unwrap();
106 0 : let d = std::time::Duration::from_millis(millis);
107 0 :
108 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
109 0 : tokio::time::timeout(d, cancel.cancelled()).await.ok();
110 0 : tracing::info!("failpoint {:?}: sleep done", name);
111 0 : }
112 :
113 0 : pub fn init() -> fail::FailScenario<'static> {
114 0 : // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
115 0 : // We want non-default behavior for `exit`, though, so, we handle it separately.
116 0 : //
117 0 : // Format for FAILPOINTS is "name=actions" separated by ";".
118 0 : let actions = std::env::var("FAILPOINTS");
119 0 : if actions.is_ok() {
120 0 : std::env::remove_var("FAILPOINTS");
121 0 : } else {
122 0 : // let the library handle non-utf8, or nothing for not present
123 0 : }
124 :
125 0 : let scenario = fail::FailScenario::setup();
126 :
127 0 : if let Ok(val) = actions {
128 0 : val.split(';')
129 0 : .enumerate()
130 0 : .map(|(i, s)| s.split_once('=').ok_or((i, s)))
131 0 : .for_each(|res| {
132 0 : let (name, actions) = match res {
133 0 : Ok(t) => t,
134 0 : Err((i, s)) => {
135 0 : panic!(
136 0 : "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
137 0 : i + 1,
138 0 : );
139 : }
140 : };
141 0 : if let Err(e) = apply_failpoint(name, actions) {
142 0 : panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
143 0 : }
144 0 : });
145 0 : }
146 :
147 0 : scenario
148 0 : }
149 :
150 0 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
151 0 : if actions == "exit" {
152 0 : fail::cfg_callback(name, exit_failpoint)
153 : } else {
154 0 : fail::cfg(name, actions)
155 : }
156 0 : }
157 :
158 : #[inline(never)]
159 0 : fn exit_failpoint() {
160 0 : tracing::info!("Exit requested by failpoint");
161 0 : std::process::exit(1);
162 : }
163 :
164 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
165 :
166 : /// Information for configuring a single fail point
167 0 : #[derive(Debug, Serialize, Deserialize)]
168 : pub struct FailpointConfig {
169 : /// Name of the fail point
170 : pub name: String,
171 : /// List of actions to take, using the format described in `fail::cfg`
172 : ///
173 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
174 : pub actions: String,
175 : }
176 :
177 : /// Configure failpoints through http.
178 0 : pub async fn failpoints_handler(
179 0 : mut request: Request<Body>,
180 0 : _cancel: CancellationToken,
181 0 : ) -> Result<Response<Body>, ApiError> {
182 0 : if !fail::has_failpoints() {
183 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
184 0 : "Cannot manage failpoints because storage was compiled without failpoints support"
185 0 : )));
186 0 : }
187 :
188 0 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
189 0 : for fp in failpoints {
190 0 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
191 :
192 : // We recognize one extra "action" that's not natively recognized
193 : // by the failpoints crate: exit, to immediately kill the process
194 0 : let cfg_result = apply_failpoint(&fp.name, &fp.actions);
195 :
196 0 : if let Err(err_msg) = cfg_result {
197 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
198 0 : "Failed to configure failpoints: {err_msg}"
199 0 : )));
200 0 : }
201 : }
202 :
203 0 : json_response(StatusCode::OK, ())
204 0 : }
|