Line data Source code
1 : use std::fmt::{Debug, Display};
2 :
3 : use futures::Future;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
7 : pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
8 :
9 4353 : pub async fn exponential_backoff(
10 4353 : n: u32,
11 4353 : base_increment: f64,
12 4353 : max_seconds: f64,
13 4353 : cancel: &CancellationToken,
14 4353 : ) {
15 4351 : let backoff_duration_seconds =
16 4351 : exponential_backoff_duration_seconds(n, base_increment, max_seconds);
17 4351 : if backoff_duration_seconds > 0.0 {
18 34 : tracing::info!(
19 34 : "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
20 34 : );
21 :
22 : drop(
23 36 : tokio::time::timeout(
24 36 : std::time::Duration::from_secs_f64(backoff_duration_seconds),
25 36 : cancel.cancelled(),
26 36 : )
27 36 : .await,
28 : )
29 4315 : }
30 4348 : }
31 :
32 24367 : pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
33 24367 : if n == 0 {
34 4332 : 0.0
35 : } else {
36 20035 : (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
37 : }
38 24367 : }
39 :
40 : /// Retries passed operation until one of the following conditions are met:
41 : /// - encountered error is considered as permanent (non-retryable)
42 : /// - retries have been exhausted
43 : /// - cancellation token has been cancelled
44 : ///
45 : /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
46 : /// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
47 : /// `description` argument is added to log messages. Its value should identify the `op` is doing
48 : /// `cancel` cancels new attempts and the backoff sleep.
49 : ///
50 : /// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
51 : /// for any other error type. Final failed attempt is logged with `{:?}`.
52 : ///
53 : /// Returns `None` if cancellation was noticed during backoff or the terminal result.
54 15893 : pub async fn retry<T, O, F, E>(
55 15893 : mut op: O,
56 15893 : is_permanent: impl Fn(&E) -> bool,
57 15893 : warn_threshold: u32,
58 15893 : max_retries: u32,
59 15893 : description: &str,
60 15893 : cancel: &CancellationToken,
61 15893 : ) -> Option<Result<T, E>>
62 15893 : where
63 15893 : // Not std::error::Error because anyhow::Error doesnt implement it.
64 15893 : // For context see https://github.com/dtolnay/anyhow/issues/63
65 15893 : E: Display + Debug + 'static,
66 15893 : O: FnMut() -> F,
67 15893 : F: Future<Output = Result<T, E>>,
68 15893 : {
69 15893 : let mut attempts = 0;
70 : loop {
71 16482 : if cancel.is_cancelled() {
72 10 : return None;
73 16472 : }
74 :
75 478289 : let result = op().await;
76 21 : match &result {
77 : Ok(_) => {
78 15255 : if attempts > 0 {
79 489 : tracing::info!("{description} succeeded after {attempts} retries");
80 14740 : }
81 15255 : return Some(result);
82 : }
83 :
84 : // These are "permanent" errors that should not be retried.
85 1217 : Err(e) if is_permanent(e) => {
86 622 : return Some(result);
87 : }
88 : // Assume that any other failure might be transient, and the operation might
89 : // succeed if we just keep trying.
90 595 : Err(err) if attempts < warn_threshold => {
91 544 : tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
92 : }
93 21 : Err(err) if attempts < max_retries => {
94 18 : tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
95 : }
96 3 : Err(err) => {
97 : // Operation failed `max_attempts` times. Time to give up.
98 1 : tracing::warn!(
99 1 : "{description} still failed after {attempts} retries, giving up: {err:?}"
100 1 : );
101 3 : return Some(result);
102 : }
103 : }
104 : // sleep and retry
105 592 : exponential_backoff(
106 592 : attempts,
107 592 : DEFAULT_BASE_BACKOFF_SECONDS,
108 592 : DEFAULT_MAX_BACKOFF_SECONDS,
109 592 : cancel,
110 592 : )
111 32 : .await;
112 589 : attempts += 1;
113 : }
114 15890 : }
115 :
116 : #[cfg(test)]
117 : mod tests {
118 : use super::*;
119 : use std::io;
120 : use tokio::sync::Mutex;
121 :
122 2 : #[test]
123 2 : fn backoff_defaults_produce_growing_backoff_sequence() {
124 2 : let mut current_backoff_value = None;
125 :
126 20002 : for i in 0..10_000 {
127 20000 : let new_backoff_value = exponential_backoff_duration_seconds(
128 20000 : i,
129 20000 : DEFAULT_BASE_BACKOFF_SECONDS,
130 20000 : DEFAULT_MAX_BACKOFF_SECONDS,
131 20000 : );
132 :
133 20000 : if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
134 19998 : assert!(
135 19998 : old_backoff_value <= new_backoff_value,
136 0 : "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
137 : )
138 2 : }
139 : }
140 :
141 2 : assert_eq!(
142 2 : current_backoff_value.expect("Should have produced backoff values to compare"),
143 : DEFAULT_MAX_BACKOFF_SECONDS,
144 0 : "Given big enough of retries, backoff should reach its allowed max value"
145 : );
146 2 : }
147 :
148 2 : #[tokio::test(start_paused = true)]
149 2 : async fn retry_always_error() {
150 2 : let count = Mutex::new(0);
151 2 : retry(
152 4 : || async {
153 4 : *count.lock().await += 1;
154 4 : Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
155 4 : },
156 4 : |_e| false,
157 2 : 1,
158 2 : 1,
159 2 : "work",
160 2 : &CancellationToken::new(),
161 2 : )
162 0 : .await
163 2 : .expect("not cancelled")
164 2 : .expect_err("it can only fail");
165 :
166 2 : assert_eq!(*count.lock().await, 2);
167 : }
168 :
169 2 : #[tokio::test(start_paused = true)]
170 2 : async fn retry_ok_after_err() {
171 2 : let count = Mutex::new(0);
172 2 : retry(
173 6 : || async {
174 6 : let mut locked = count.lock().await;
175 6 : if *locked > 1 {
176 2 : Ok(())
177 : } else {
178 4 : *locked += 1;
179 4 : Err(io::Error::from(io::ErrorKind::Other))
180 : }
181 6 : },
182 4 : |_e| false,
183 2 : 2,
184 2 : 2,
185 2 : "work",
186 2 : &CancellationToken::new(),
187 2 : )
188 2 : .await
189 2 : .expect("not cancelled")
190 2 : .expect("success on second try");
191 : }
192 :
193 2 : #[tokio::test(start_paused = true)]
194 2 : async fn dont_retry_permanent_errors() {
195 2 : let count = Mutex::new(0);
196 2 : let _ = retry(
197 2 : || async {
198 2 : let mut locked = count.lock().await;
199 2 : if *locked > 1 {
200 0 : Ok(())
201 : } else {
202 2 : *locked += 1;
203 2 : Err(io::Error::from(io::ErrorKind::Other))
204 : }
205 2 : },
206 2 : |_e| true,
207 2 : 2,
208 2 : 2,
209 2 : "work",
210 2 : &CancellationToken::new(),
211 2 : )
212 0 : .await
213 2 : .expect("was not cancellation")
214 2 : .expect_err("it was permanent error");
215 :
216 2 : assert_eq!(*count.lock().await, 1);
217 : }
218 : }
|