Line data Source code
1 : //! Failpoint support code shared between pageserver and safekeepers.
2 :
3 : use crate::http::{
4 : error::ApiError,
5 : json::{json_request, json_response},
6 : };
7 : use hyper::{Body, Request, Response, StatusCode};
8 : use serde::{Deserialize, Serialize};
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::*;
11 :
12 : /// Declare a failpoint that can use to `pause` failpoint action.
13 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
14 : ///
15 : /// Optionally pass a cancellation token, and this failpoint will drop out of
16 : /// its pause when the cancellation token fires. This is useful for testing
17 : /// cases where we would like to block something, but test its clean shutdown behavior.
18 : /// The macro evaluates to a Result in that case, where Ok(()) is the case
19 : /// where the failpoint was not paused, and Err() is the case where cancellation
20 : /// token fired while evaluating the failpoint.
21 : ///
22 : /// Remember to unpause the failpoint in the test; until that happens, one of the
23 : /// limited number of spawn_blocking thread pool threads is leaked.
24 : #[macro_export]
25 : macro_rules! pausable_failpoint {
26 : ($name:literal) => {{
27 : if cfg!(feature = "testing") {
28 : let cancel = ::tokio_util::sync::CancellationToken::new();
29 : let _ = $crate::pausable_failpoint!($name, &cancel);
30 : }
31 : }};
32 : ($name:literal, $cancel:expr) => {{
33 : if cfg!(feature = "testing") {
34 : let failpoint_fut = ::tokio::task::spawn_blocking({
35 : let current = ::tracing::Span::current();
36 12591 : move || {
37 12591 : let _entered = current.entered();
38 12591 : ::tracing::info!("at failpoint {}", $name);
39 12591 : ::fail::fail_point!($name);
40 12591 : }
41 : });
42 12205 : let cancel_fut = async move {
43 12205 : $cancel.cancelled().await;
44 : };
45 : ::tokio::select! {
46 : res = failpoint_fut => {
47 : res.expect("spawn_blocking");
48 : // continue with execution
49 : Ok(())
50 : },
51 : _ = cancel_fut => {
52 : Err(())
53 : }
54 : }
55 : } else {
56 : Ok(())
57 : }
58 : }};
59 : }
60 :
61 : pub use pausable_failpoint;
62 :
63 : /// use with fail::cfg("$name", "return(2000)")
64 : ///
65 : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
66 : /// specified time (in milliseconds). The main difference is that we use async
67 : /// tokio sleep function. Another difference is that we print lines to the log,
68 : /// which can be useful in tests to check that the failpoint was hit.
69 : ///
70 : /// Optionally pass a cancellation token, and this failpoint will drop out of
71 : /// its sleep when the cancellation token fires. This is useful for testing
72 : /// cases where we would like to block something, but test its clean shutdown behavior.
73 : #[macro_export]
74 : macro_rules! __failpoint_sleep_millis_async {
75 : ($name:literal) => {{
76 : // If the failpoint is used with a "return" action, set should_sleep to the
77 : // returned value (as string). Otherwise it's set to None.
78 292152 : let should_sleep = (|| {
79 292152 : ::fail::fail_point!($name, |x| x);
80 292152 : ::std::option::Option::None
81 : })();
82 :
83 : // Sleep if the action was a returned value
84 : if let ::std::option::Option::Some(duration_str) = should_sleep {
85 : $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
86 : }
87 : }};
88 : ($name:literal, $cancel:expr) => {{
89 : // If the failpoint is used with a "return" action, set should_sleep to the
90 : // returned value (as string). Otherwise it's set to None.
91 440 : let should_sleep = (|| {
92 440 : ::fail::fail_point!($name, |x| x);
93 440 : ::std::option::Option::None
94 : })();
95 :
96 : // Sleep if the action was a returned value
97 : if let ::std::option::Option::Some(duration_str) = should_sleep {
98 : $crate::failpoint_support::failpoint_sleep_cancellable_helper(
99 : $name,
100 : duration_str,
101 : $cancel,
102 : )
103 : .await
104 : }
105 : }};
106 : }
107 : pub use __failpoint_sleep_millis_async as sleep_millis_async;
108 :
109 : // Helper function used by the macro. (A function has nicer scoping so we
110 : // don't need to decorate everything with "::")
111 : #[doc(hidden)]
112 0 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
113 0 : let millis = duration_str.parse::<u64>().unwrap();
114 0 : let d = std::time::Duration::from_millis(millis);
115 0 :
116 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
117 0 : tokio::time::sleep(d).await;
118 0 : tracing::info!("failpoint {:?}: sleep done", name);
119 0 : }
120 :
121 : // Helper function used by the macro. (A function has nicer scoping so we
122 : // don't need to decorate everything with "::")
123 : #[doc(hidden)]
124 0 : pub async fn failpoint_sleep_cancellable_helper(
125 0 : name: &'static str,
126 0 : duration_str: String,
127 0 : cancel: &CancellationToken,
128 0 : ) {
129 0 : let millis = duration_str.parse::<u64>().unwrap();
130 0 : let d = std::time::Duration::from_millis(millis);
131 0 :
132 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
133 0 : tokio::time::timeout(d, cancel.cancelled()).await.ok();
134 0 : tracing::info!("failpoint {:?}: sleep done", name);
135 0 : }
136 :
137 0 : pub fn init() -> fail::FailScenario<'static> {
138 0 : // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
139 0 : // We want non-default behavior for `exit`, though, so, we handle it separately.
140 0 : //
141 0 : // Format for FAILPOINTS is "name=actions" separated by ";".
142 0 : let actions = std::env::var("FAILPOINTS");
143 0 : if actions.is_ok() {
144 0 : std::env::remove_var("FAILPOINTS");
145 0 : } else {
146 0 : // let the library handle non-utf8, or nothing for not present
147 0 : }
148 :
149 0 : let scenario = fail::FailScenario::setup();
150 :
151 0 : if let Ok(val) = actions {
152 0 : val.split(';')
153 0 : .enumerate()
154 0 : .map(|(i, s)| s.split_once('=').ok_or((i, s)))
155 0 : .for_each(|res| {
156 0 : let (name, actions) = match res {
157 0 : Ok(t) => t,
158 0 : Err((i, s)) => {
159 0 : panic!(
160 0 : "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
161 0 : i + 1,
162 0 : );
163 : }
164 : };
165 0 : if let Err(e) = apply_failpoint(name, actions) {
166 0 : panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
167 0 : }
168 0 : });
169 0 : }
170 :
171 0 : scenario
172 0 : }
173 :
174 0 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
175 0 : if actions == "exit" {
176 0 : fail::cfg_callback(name, exit_failpoint)
177 : } else {
178 0 : fail::cfg(name, actions)
179 : }
180 0 : }
181 :
182 : #[inline(never)]
183 0 : fn exit_failpoint() {
184 0 : tracing::info!("Exit requested by failpoint");
185 0 : std::process::exit(1);
186 : }
187 :
188 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
189 :
190 : /// Information for configuring a single fail point
191 0 : #[derive(Debug, Serialize, Deserialize)]
192 : pub struct FailpointConfig {
193 : /// Name of the fail point
194 : pub name: String,
195 : /// List of actions to take, using the format described in `fail::cfg`
196 : ///
197 : /// We also support `actions = "exit"` to cause the fail point to immediately exit.
198 : pub actions: String,
199 : }
200 :
201 : /// Configure failpoints through http.
202 0 : pub async fn failpoints_handler(
203 0 : mut request: Request<Body>,
204 0 : _cancel: CancellationToken,
205 0 : ) -> Result<Response<Body>, ApiError> {
206 0 : if !fail::has_failpoints() {
207 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
208 0 : "Cannot manage failpoints because neon was compiled without failpoints support"
209 0 : )));
210 0 : }
211 :
212 0 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
213 0 : for fp in failpoints {
214 0 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
215 :
216 : // We recognize one extra "action" that's not natively recognized
217 : // by the failpoints crate: exit, to immediately kill the process
218 0 : let cfg_result = apply_failpoint(&fp.name, &fp.actions);
219 :
220 0 : if let Err(err_msg) = cfg_result {
221 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
222 0 : "Failed to configure failpoints: {err_msg}"
223 0 : )));
224 0 : }
225 : }
226 :
227 0 : json_response(StatusCode::OK, ())
228 0 : }
|