Line data Source code
1 : use std::fmt::{Debug, Display};
2 :
3 : use futures::Future;
4 : use tokio_util::sync::CancellationToken;
5 :
6 : pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
7 : pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
8 :
9 1222 : pub async fn exponential_backoff(
10 1222 : n: u32,
11 1222 : base_increment: f64,
12 1222 : max_seconds: f64,
13 1222 : cancel: &CancellationToken,
14 1222 : ) {
15 1222 : let backoff_duration_seconds =
16 1222 : exponential_backoff_duration_seconds(n, base_increment, max_seconds);
17 1222 : if backoff_duration_seconds > 0.0 {
18 6 : tracing::info!(
19 0 : "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
20 : );
21 :
22 : drop(
23 6 : tokio::time::timeout(
24 6 : std::time::Duration::from_secs_f64(backoff_duration_seconds),
25 6 : cancel.cancelled(),
26 6 : )
27 6 : .await,
28 : )
29 1216 : }
30 1222 : }
31 :
32 61222 : pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
33 61222 : if n == 0 {
34 1222 : 0.0
35 : } else {
36 60000 : (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
37 : }
38 61222 : }
39 :
40 : /// Retries passed operation until one of the following conditions are met:
41 : /// - encountered error is considered as permanent (non-retryable)
42 : /// - retries have been exhausted
43 : /// - cancellation token has been cancelled
44 : ///
45 : /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
46 : /// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
47 : /// `description` argument is added to log messages. Its value should identify the `op` is doing
48 : /// `cancel` cancels new attempts and the backoff sleep.
49 : ///
50 : /// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
51 : /// for any other error type. Final failed attempt is logged with `{:?}`.
52 : ///
53 : /// Returns `None` if cancellation was noticed during backoff or the terminal result.
54 1017 : pub async fn retry<T, O, F, E>(
55 1017 : mut op: O,
56 1017 : is_permanent: impl Fn(&E) -> bool,
57 1017 : warn_threshold: u32,
58 1017 : max_retries: u32,
59 1017 : description: &str,
60 1017 : cancel: &CancellationToken,
61 1017 : ) -> Option<Result<T, E>>
62 1017 : where
63 1017 : // Not std::error::Error because anyhow::Error doesnt implement it.
64 1017 : // For context see https://github.com/dtolnay/anyhow/issues/63
65 1017 : E: Display + Debug + 'static,
66 1017 : O: FnMut() -> F,
67 1017 : F: Future<Output = Result<T, E>>,
68 1017 : {
69 1017 : let mut attempts = 0;
70 : loop {
71 1110 : if cancel.is_cancelled() {
72 0 : return None;
73 1110 : }
74 :
75 5418 : let result = op().await;
76 6 : match &result {
77 : Ok(_) => {
78 963 : if attempts > 0 {
79 81 : tracing::info!("{description} succeeded after {attempts} retries");
80 882 : }
81 963 : return Some(result);
82 : }
83 :
84 : // These are "permanent" errors that should not be retried.
85 147 : Err(e) if is_permanent(e) => {
86 48 : return Some(result);
87 : }
88 : // Assume that any other failure might be transient, and the operation might
89 : // succeed if we just keep trying.
90 99 : Err(err) if attempts < warn_threshold => {
91 93 : tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
92 : }
93 6 : Err(err) if attempts < max_retries => {
94 0 : tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
95 : }
96 6 : Err(err) => {
97 6 : // Operation failed `max_attempts` times. Time to give up.
98 6 : tracing::warn!(
99 0 : "{description} still failed after {attempts} retries, giving up: {err:?}"
100 : );
101 6 : return Some(result);
102 : }
103 : }
104 : // sleep and retry
105 93 : exponential_backoff(
106 93 : attempts,
107 93 : DEFAULT_BASE_BACKOFF_SECONDS,
108 93 : DEFAULT_MAX_BACKOFF_SECONDS,
109 93 : cancel,
110 93 : )
111 6 : .await;
112 93 : attempts += 1;
113 : }
114 1017 : }
115 :
116 : #[cfg(test)]
117 : mod tests {
118 : use super::*;
119 : use std::io;
120 : use tokio::sync::Mutex;
121 :
122 : #[test]
123 6 : fn backoff_defaults_produce_growing_backoff_sequence() {
124 6 : let mut current_backoff_value = None;
125 :
126 60006 : for i in 0..10_000 {
127 60000 : let new_backoff_value = exponential_backoff_duration_seconds(
128 60000 : i,
129 60000 : DEFAULT_BASE_BACKOFF_SECONDS,
130 60000 : DEFAULT_MAX_BACKOFF_SECONDS,
131 60000 : );
132 :
133 60000 : if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
134 59994 : assert!(
135 59994 : old_backoff_value <= new_backoff_value,
136 0 : "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
137 : )
138 6 : }
139 : }
140 :
141 6 : assert_eq!(
142 6 : current_backoff_value.expect("Should have produced backoff values to compare"),
143 : DEFAULT_MAX_BACKOFF_SECONDS,
144 0 : "Given big enough of retries, backoff should reach its allowed max value"
145 : );
146 6 : }
147 :
148 : #[tokio::test(start_paused = true)]
149 6 : async fn retry_always_error() {
150 6 : let count = Mutex::new(0);
151 6 : retry(
152 12 : || async {
153 12 : *count.lock().await += 1;
154 12 : Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
155 12 : },
156 12 : |_e| false,
157 6 : 1,
158 6 : 1,
159 6 : "work",
160 6 : &CancellationToken::new(),
161 6 : )
162 6 : .await
163 6 : .expect("not cancelled")
164 6 : .expect_err("it can only fail");
165 6 :
166 6 : assert_eq!(*count.lock().await, 2);
167 6 : }
168 :
169 : #[tokio::test(start_paused = true)]
170 6 : async fn retry_ok_after_err() {
171 6 : let count = Mutex::new(0);
172 6 : retry(
173 18 : || async {
174 18 : let mut locked = count.lock().await;
175 18 : if *locked > 1 {
176 18 : Ok(())
177 18 : } else {
178 18 : *locked += 1;
179 12 : Err(io::Error::from(io::ErrorKind::Other))
180 18 : }
181 18 : },
182 12 : |_e| false,
183 6 : 2,
184 6 : 2,
185 6 : "work",
186 6 : &CancellationToken::new(),
187 6 : )
188 6 : .await
189 6 : .expect("not cancelled")
190 6 : .expect("success on second try");
191 6 : }
192 :
193 : #[tokio::test(start_paused = true)]
194 6 : async fn dont_retry_permanent_errors() {
195 6 : let count = Mutex::new(0);
196 6 : let _ = retry(
197 6 : || async {
198 6 : let mut locked = count.lock().await;
199 6 : if *locked > 1 {
200 6 : Ok(())
201 6 : } else {
202 6 : *locked += 1;
203 6 : Err(io::Error::from(io::ErrorKind::Other))
204 6 : }
205 6 : },
206 6 : |_e| true,
207 6 : 2,
208 6 : 2,
209 6 : "work",
210 6 : &CancellationToken::new(),
211 6 : )
212 6 : .await
213 6 : .expect("was not cancellation")
214 6 : .expect_err("it was permanent error");
215 6 :
216 6 : assert_eq!(*count.lock().await, 1);
217 6 : }
218 : }
|