1 : //! Failpoint support code shared between pageserver and safekeepers.
2 :
3 : use tokio_util::sync::CancellationToken;
4 :
5 : /// Declare a failpoint that can use to `pause` failpoint action.
6 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
7 : ///
8 : /// Optionally pass a cancellation token, and this failpoint will drop out of
9 : /// its pause when the cancellation token fires. This is useful for testing
10 : /// cases where we would like to block something, but test its clean shutdown behavior.
11 : /// The macro evaluates to a Result in that case, where Ok(()) is the case
12 : /// where the failpoint was not paused, and Err() is the case where cancellation
13 : /// token fired while evaluating the failpoint.
14 : ///
15 : /// Remember to unpause the failpoint in the test; until that happens, one of the
16 : /// limited number of spawn_blocking thread pool threads is leaked.
17 : #[macro_export]
18 : macro_rules! pausable_failpoint {
19 : ($name:literal) => {{
20 : if cfg!(feature = "testing") {
21 : let cancel = ::tokio_util::sync::CancellationToken::new();
22 : let _ = $crate::pausable_failpoint!($name, &cancel);
23 : }
24 : }};
25 : ($name:literal, $cancel:expr) => {{
26 : if cfg!(feature = "testing") {
27 : let failpoint_fut = ::tokio::task::spawn_blocking({
28 : let current = ::tracing::Span::current();
29 12662 : move || {
30 12662 : let _entered = current.entered();
31 12662 : ::tracing::info!("at failpoint {}", $name);
32 12662 : ::fail::fail_point!($name);
33 12662 : }
34 : });
35 12138 : let cancel_fut = async move {
36 12138 : $cancel.cancelled().await;
37 : };
38 : ::tokio::select! {
39 : res = failpoint_fut => {
40 : res.expect("spawn_blocking");
41 : // continue with execution
42 : Ok(())
43 : },
44 : _ = cancel_fut => {
45 : Err(())
46 : }
47 : }
48 : } else {
49 : Ok(())
50 : }
51 : }};
52 : }
53 :
54 : pub use pausable_failpoint;
55 :
56 : /// use with fail::cfg("$name", "return(2000)")
57 : ///
58 : /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
59 : /// specified time (in milliseconds). The main difference is that we use async
60 : /// tokio sleep function. Another difference is that we print lines to the log,
61 : /// which can be useful in tests to check that the failpoint was hit.
62 : ///
63 : /// Optionally pass a cancellation token, and this failpoint will drop out of
64 : /// its sleep when the cancellation token fires. This is useful for testing
65 : /// cases where we would like to block something, but test its clean shutdown behavior.
66 : #[macro_export]
67 : macro_rules! __failpoint_sleep_millis_async {
68 : ($name:literal) => {{
69 : // If the failpoint is used with a "return" action, set should_sleep to the
70 : // returned value (as string). Otherwise it's set to None.
71 292164 : let should_sleep = (|| {
72 292164 : ::fail::fail_point!($name, |x| x);
73 292164 : ::std::option::Option::None
74 : })();
75 :
76 : // Sleep if the action was a returned value
77 : if let ::std::option::Option::Some(duration_str) = should_sleep {
78 : $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
79 : }
80 : }};
81 : ($name:literal, $cancel:expr) => {{
82 : // If the failpoint is used with a "return" action, set should_sleep to the
83 : // returned value (as string). Otherwise it's set to None.
84 452 : let should_sleep = (|| {
85 452 : ::fail::fail_point!($name, |x| x);
86 452 : ::std::option::Option::None
87 : })();
88 :
89 : // Sleep if the action was a returned value
90 : if let ::std::option::Option::Some(duration_str) = should_sleep {
91 : $crate::failpoint_support::failpoint_sleep_cancellable_helper(
92 : $name,
93 : duration_str,
94 : $cancel,
95 : )
96 : .await
97 : }
98 : }};
99 : }
100 : pub use __failpoint_sleep_millis_async as sleep_millis_async;
101 :
102 : // Helper function used by the macro. (A function has nicer scoping so we
103 : // don't need to decorate everything with "::")
104 : #[doc(hidden)]
105 0 : pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
106 0 : let millis = duration_str.parse::<u64>().unwrap();
107 0 : let d = std::time::Duration::from_millis(millis);
108 0 :
109 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
110 0 : tokio::time::sleep(d).await;
111 0 : tracing::info!("failpoint {:?}: sleep done", name);
112 0 : }
113 :
114 : // Helper function used by the macro. (A function has nicer scoping so we
115 : // don't need to decorate everything with "::")
116 : #[doc(hidden)]
117 0 : pub async fn failpoint_sleep_cancellable_helper(
118 0 : name: &'static str,
119 0 : duration_str: String,
120 0 : cancel: &CancellationToken,
121 0 : ) {
122 0 : let millis = duration_str.parse::<u64>().unwrap();
123 0 : let d = std::time::Duration::from_millis(millis);
124 0 :
125 0 : tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
126 0 : tokio::time::timeout(d, cancel.cancelled()).await.ok();
127 0 : tracing::info!("failpoint {:?}: sleep done", name);
128 0 : }
129 :
130 : /// Initialize the configured failpoints
131 : ///
132 : /// You must call this function before any concurrent threads do operations.
133 0 : pub fn init() -> fail::FailScenario<'static> {
134 0 : // The failpoints lib provides support for parsing the `FAILPOINTS` env var.
135 0 : // We want non-default behavior for `exit`, though, so, we handle it separately.
136 0 : //
137 0 : // Format for FAILPOINTS is "name=actions" separated by ";".
138 0 : let actions = std::env::var("FAILPOINTS");
139 0 : if actions.is_ok() {
140 : // SAFETY: this function should before any threads start and access env vars concurrently
141 0 : unsafe {
142 0 : std::env::remove_var("FAILPOINTS");
143 0 : }
144 0 : } else {
145 0 : // let the library handle non-utf8, or nothing for not present
146 0 : }
147 :
148 0 : let scenario = fail::FailScenario::setup();
149 :
150 0 : if let Ok(val) = actions {
151 0 : val.split(';')
152 0 : .enumerate()
153 0 : .map(|(i, s)| s.split_once('=').ok_or((i, s)))
154 0 : .for_each(|res| {
155 0 : let (name, actions) = match res {
156 0 : Ok(t) => t,
157 0 : Err((i, s)) => {
158 0 : panic!(
159 0 : "startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
160 0 : i + 1,
161 0 : );
162 : }
163 : };
164 0 : if let Err(e) = apply_failpoint(name, actions) {
165 0 : panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
166 0 : }
167 0 : });
168 0 : }
169 :
170 0 : scenario
171 0 : }
172 :
173 0 : pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
174 0 : if actions == "exit" {
175 0 : fail::cfg_callback(name, exit_failpoint)
176 : } else {
177 0 : fail::cfg(name, actions)
178 : }
179 0 : }
180 :
181 : #[inline(never)]
182 0 : fn exit_failpoint() {
183 0 : tracing::info!("Exit requested by failpoint");
184 0 : std::process::exit(1);
185 : }