Line data Source code
1 : use std::fmt::{Debug, Display};
2 :
3 : use futures::Future;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
7 : pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
8 :
9 4399 : pub async fn exponential_backoff(
10 4399 : n: u32,
11 4399 : base_increment: f64,
12 4399 : max_seconds: f64,
13 4399 : cancel: &CancellationToken,
14 4399 : ) {
15 4395 : let backoff_duration_seconds =
16 4395 : exponential_backoff_duration_seconds(n, base_increment, max_seconds);
17 4395 : if backoff_duration_seconds > 0.0 {
18 42 : tracing::info!(
19 42 : "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
20 42 : );
21 :
22 : drop(
23 44 : tokio::time::timeout(
24 44 : std::time::Duration::from_secs_f64(backoff_duration_seconds),
25 44 : cancel.cancelled(),
26 44 : )
27 40 : .await,
28 : )
29 4351 : }
30 4389 : }
31 :
32 24406 : pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
33 24406 : if n == 0 {
34 4363 : 0.0
35 : } else {
36 20043 : (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
37 : }
38 24406 : }
39 :
40 : /// Retries passed operation until one of the following conditions are met:
41 : /// - encountered error is considered as permanent (non-retryable)
42 : /// - retries have been exhausted
43 : /// - cancellation token has been cancelled
44 : ///
45 : /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
46 : /// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
47 : /// `description` argument is added to log messages. Its value should identify the `op` is doing
48 : /// `cancel` cancels new attempts and the backoff sleep.
49 : ///
50 : /// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
51 : /// for any other error type. Final failed attempt is logged with `{:?}`.
52 : ///
53 : /// Returns `None` if cancellation was noticed during backoff or the terminal result.
54 15804 : pub async fn retry<T, O, F, E>(
55 15804 : mut op: O,
56 15804 : is_permanent: impl Fn(&E) -> bool,
57 15804 : warn_threshold: u32,
58 15804 : max_retries: u32,
59 15804 : description: &str,
60 15804 : cancel: &CancellationToken,
61 15804 : ) -> Option<Result<T, E>>
62 15804 : where
63 15804 : // Not std::error::Error because anyhow::Error doesnt implement it.
64 15804 : // For context see https://github.com/dtolnay/anyhow/issues/63
65 15804 : E: Display + Debug + 'static,
66 15804 : O: FnMut() -> F,
67 15804 : F: Future<Output = Result<T, E>>,
68 15804 : {
69 15804 : let mut attempts = 0;
70 : loop {
71 16394 : if cancel.is_cancelled() {
72 4 : return None;
73 16390 : }
74 :
75 525169 : let result = op().await;
76 29 : match &result {
77 : Ok(_) => {
78 15157 : if attempts > 0 {
79 489 : tracing::info!("{description} succeeded after {attempts} retries");
80 14642 : }
81 15157 : return Some(result);
82 : }
83 :
84 : // These are "permanent" errors that should not be retried.
85 1233 : Err(e) if is_permanent(e) => {
86 634 : return Some(result);
87 : }
88 : // Assume that any other failure might be transient, and the operation might
89 : // succeed if we just keep trying.
90 599 : Err(err) if attempts < warn_threshold => {
91 540 : tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
92 : }
93 29 : Err(err) if attempts < max_retries => {
94 25 : tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
95 : }
96 4 : Err(err) => {
97 : // Operation failed `max_attempts` times. Time to give up.
98 2 : tracing::warn!(
99 2 : "{description} still failed after {attempts} retries, giving up: {err:?}"
100 2 : );
101 4 : return Some(result);
102 : }
103 : }
104 : // sleep and retry
105 595 : exponential_backoff(
106 595 : attempts,
107 595 : DEFAULT_BASE_BACKOFF_SECONDS,
108 595 : DEFAULT_MAX_BACKOFF_SECONDS,
109 595 : cancel,
110 595 : )
111 36 : .await;
112 590 : attempts += 1;
113 : }
114 15799 : }
115 :
116 : #[cfg(test)]
117 : mod tests {
118 : use super::*;
119 : use std::io;
120 : use tokio::sync::Mutex;
121 :
122 2 : #[test]
123 2 : fn backoff_defaults_produce_growing_backoff_sequence() {
124 2 : let mut current_backoff_value = None;
125 :
126 20002 : for i in 0..10_000 {
127 20000 : let new_backoff_value = exponential_backoff_duration_seconds(
128 20000 : i,
129 20000 : DEFAULT_BASE_BACKOFF_SECONDS,
130 20000 : DEFAULT_MAX_BACKOFF_SECONDS,
131 20000 : );
132 :
133 20000 : if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
134 19998 : assert!(
135 19998 : old_backoff_value <= new_backoff_value,
136 0 : "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
137 : )
138 2 : }
139 : }
140 :
141 2 : assert_eq!(
142 2 : current_backoff_value.expect("Should have produced backoff values to compare"),
143 : DEFAULT_MAX_BACKOFF_SECONDS,
144 0 : "Given big enough of retries, backoff should reach its allowed max value"
145 : );
146 2 : }
147 :
148 2 : #[tokio::test(start_paused = true)]
149 2 : async fn retry_always_error() {
150 2 : let count = Mutex::new(0);
151 2 : retry(
152 4 : || async {
153 4 : *count.lock().await += 1;
154 4 : Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
155 4 : },
156 4 : |_e| false,
157 2 : 1,
158 2 : 1,
159 2 : "work",
160 2 : &CancellationToken::new(),
161 2 : )
162 2 : .await
163 2 : .expect("not cancelled")
164 2 : .expect_err("it can only fail");
165 2 :
166 2 : assert_eq!(*count.lock().await, 2);
167 2 : }
168 :
169 2 : #[tokio::test(start_paused = true)]
170 2 : async fn retry_ok_after_err() {
171 2 : let count = Mutex::new(0);
172 2 : retry(
173 6 : || async {
174 6 : let mut locked = count.lock().await;
175 6 : if *locked > 1 {
176 2 : Ok(())
177 2 : } else {
178 4 : *locked += 1;
179 4 : Err(io::Error::from(io::ErrorKind::Other))
180 2 : }
181 12 : },
182 4 : |_e| false,
183 2 : 2,
184 2 : 2,
185 2 : "work",
186 2 : &CancellationToken::new(),
187 2 : )
188 2 : .await
189 2 : .expect("not cancelled")
190 2 : .expect("success on second try");
191 2 : }
192 :
193 2 : #[tokio::test(start_paused = true)]
194 2 : async fn dont_retry_permanent_errors() {
195 2 : let count = Mutex::new(0);
196 2 : let _ = retry(
197 2 : || async {
198 2 : let mut locked = count.lock().await;
199 2 : if *locked > 1 {
200 2 : Ok(())
201 2 : } else {
202 2 : *locked += 1;
203 2 : Err(io::Error::from(io::ErrorKind::Other))
204 2 : }
205 4 : },
206 2 : |_e| true,
207 2 : 2,
208 2 : 2,
209 2 : "work",
210 2 : &CancellationToken::new(),
211 2 : )
212 2 : .await
213 2 : .expect("was not cancellation")
214 2 : .expect_err("it was permanent error");
215 2 :
216 2 : assert_eq!(*count.lock().await, 1);
217 2 : }
218 : }
|