LCOV - code coverage report
Current view: top level - libs/utils/src - backoff.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 98.7 % 155 153
Test Date: 2024-02-12 20:26:03 Functions: 43.9 % 237 104

            Line data    Source code
       1              : use std::fmt::{Debug, Display};
       2              : 
       3              : use futures::Future;
       4              : use tokio_util::sync::CancellationToken;
       5              : 
       6              : pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
       7              : pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
       8              : 
       9         4399 : pub async fn exponential_backoff(
      10         4399 :     n: u32,
      11         4399 :     base_increment: f64,
      12         4399 :     max_seconds: f64,
      13         4399 :     cancel: &CancellationToken,
      14         4399 : ) {
      15         4395 :     let backoff_duration_seconds =
      16         4395 :         exponential_backoff_duration_seconds(n, base_increment, max_seconds);
      17         4395 :     if backoff_duration_seconds > 0.0 {
      18           42 :         tracing::info!(
      19           42 :             "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
      20           42 :         );
      21              : 
      22              :         drop(
      23           44 :             tokio::time::timeout(
      24           44 :                 std::time::Duration::from_secs_f64(backoff_duration_seconds),
      25           44 :                 cancel.cancelled(),
      26           44 :             )
      27           40 :             .await,
      28              :         )
      29         4351 :     }
      30         4389 : }
      31              : 
      32        24406 : pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
      33        24406 :     if n == 0 {
      34         4363 :         0.0
      35              :     } else {
      36        20043 :         (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
      37              :     }
      38        24406 : }
      39              : 
      40              : /// Retries passed operation until one of the following conditions are met:
      41              : /// - encountered error is considered as permanent (non-retryable)
      42              : /// - retries have been exhausted
      43              : /// - cancellation token has been cancelled
      44              : ///
      45              : /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
      46              : /// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
      47              : /// `description` argument is added to log messages. Its value should identify the `op` is doing
      48              : /// `cancel` cancels new attempts and the backoff sleep.
      49              : ///
      50              : /// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
      51              : /// for any other error type. Final failed attempt is logged with `{:?}`.
      52              : ///
      53              : /// Returns `None` if cancellation was noticed during backoff or the terminal result.
      54        15804 : pub async fn retry<T, O, F, E>(
      55        15804 :     mut op: O,
      56        15804 :     is_permanent: impl Fn(&E) -> bool,
      57        15804 :     warn_threshold: u32,
      58        15804 :     max_retries: u32,
      59        15804 :     description: &str,
      60        15804 :     cancel: &CancellationToken,
      61        15804 : ) -> Option<Result<T, E>>
      62        15804 : where
      63        15804 :     // Not std::error::Error because anyhow::Error doesnt implement it.
      64        15804 :     // For context see https://github.com/dtolnay/anyhow/issues/63
      65        15804 :     E: Display + Debug + 'static,
      66        15804 :     O: FnMut() -> F,
      67        15804 :     F: Future<Output = Result<T, E>>,
      68        15804 : {
      69        15804 :     let mut attempts = 0;
      70              :     loop {
      71        16394 :         if cancel.is_cancelled() {
      72            4 :             return None;
      73        16390 :         }
      74              : 
      75       525169 :         let result = op().await;
      76           29 :         match &result {
      77              :             Ok(_) => {
      78        15157 :                 if attempts > 0 {
      79          489 :                     tracing::info!("{description} succeeded after {attempts} retries");
      80        14642 :                 }
      81        15157 :                 return Some(result);
      82              :             }
      83              : 
      84              :             // These are "permanent" errors that should not be retried.
      85         1233 :             Err(e) if is_permanent(e) => {
      86          634 :                 return Some(result);
      87              :             }
      88              :             // Assume that any other failure might be transient, and the operation might
      89              :             // succeed if we just keep trying.
      90          599 :             Err(err) if attempts < warn_threshold => {
      91          540 :                 tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
      92              :             }
      93           29 :             Err(err) if attempts < max_retries => {
      94           25 :                 tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
      95              :             }
      96            4 :             Err(err) => {
      97              :                 // Operation failed `max_attempts` times. Time to give up.
      98            2 :                 tracing::warn!(
      99            2 :                     "{description} still failed after {attempts} retries, giving up: {err:?}"
     100            2 :                 );
     101            4 :                 return Some(result);
     102              :             }
     103              :         }
     104              :         // sleep and retry
     105          595 :         exponential_backoff(
     106          595 :             attempts,
     107          595 :             DEFAULT_BASE_BACKOFF_SECONDS,
     108          595 :             DEFAULT_MAX_BACKOFF_SECONDS,
     109          595 :             cancel,
     110          595 :         )
     111           36 :         .await;
     112          590 :         attempts += 1;
     113              :     }
     114        15799 : }
     115              : 
     116              : #[cfg(test)]
     117              : mod tests {
     118              :     use super::*;
     119              :     use std::io;
     120              :     use tokio::sync::Mutex;
     121              : 
     122            2 :     #[test]
     123            2 :     fn backoff_defaults_produce_growing_backoff_sequence() {
     124            2 :         let mut current_backoff_value = None;
     125              : 
     126        20002 :         for i in 0..10_000 {
     127        20000 :             let new_backoff_value = exponential_backoff_duration_seconds(
     128        20000 :                 i,
     129        20000 :                 DEFAULT_BASE_BACKOFF_SECONDS,
     130        20000 :                 DEFAULT_MAX_BACKOFF_SECONDS,
     131        20000 :             );
     132              : 
     133        20000 :             if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
     134        19998 :                 assert!(
     135        19998 :                     old_backoff_value <= new_backoff_value,
     136            0 :                     "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
     137              :                 )
     138            2 :             }
     139              :         }
     140              : 
     141            2 :         assert_eq!(
     142            2 :             current_backoff_value.expect("Should have produced backoff values to compare"),
     143              :             DEFAULT_MAX_BACKOFF_SECONDS,
     144            0 :             "Given big enough of retries, backoff should reach its allowed max value"
     145              :         );
     146            2 :     }
     147              : 
     148            2 :     #[tokio::test(start_paused = true)]
     149            2 :     async fn retry_always_error() {
     150            2 :         let count = Mutex::new(0);
     151            2 :         retry(
     152            4 :             || async {
     153            4 :                 *count.lock().await += 1;
     154            4 :                 Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
     155            4 :             },
     156            4 :             |_e| false,
     157            2 :             1,
     158            2 :             1,
     159            2 :             "work",
     160            2 :             &CancellationToken::new(),
     161            2 :         )
     162            2 :         .await
     163            2 :         .expect("not cancelled")
     164            2 :         .expect_err("it can only fail");
     165            2 : 
     166            2 :         assert_eq!(*count.lock().await, 2);
     167            2 :     }
     168              : 
     169            2 :     #[tokio::test(start_paused = true)]
     170            2 :     async fn retry_ok_after_err() {
     171            2 :         let count = Mutex::new(0);
     172            2 :         retry(
     173            6 :             || async {
     174            6 :                 let mut locked = count.lock().await;
     175            6 :                 if *locked > 1 {
     176            2 :                     Ok(())
     177            2 :                 } else {
     178            4 :                     *locked += 1;
     179            4 :                     Err(io::Error::from(io::ErrorKind::Other))
     180            2 :                 }
     181           12 :             },
     182            4 :             |_e| false,
     183            2 :             2,
     184            2 :             2,
     185            2 :             "work",
     186            2 :             &CancellationToken::new(),
     187            2 :         )
     188            2 :         .await
     189            2 :         .expect("not cancelled")
     190            2 :         .expect("success on second try");
     191            2 :     }
     192              : 
     193            2 :     #[tokio::test(start_paused = true)]
     194            2 :     async fn dont_retry_permanent_errors() {
     195            2 :         let count = Mutex::new(0);
     196            2 :         let _ = retry(
     197            2 :             || async {
     198            2 :                 let mut locked = count.lock().await;
     199            2 :                 if *locked > 1 {
     200            2 :                     Ok(())
     201            2 :                 } else {
     202            2 :                     *locked += 1;
     203            2 :                     Err(io::Error::from(io::ErrorKind::Other))
     204            2 :                 }
     205            4 :             },
     206            2 :             |_e| true,
     207            2 :             2,
     208            2 :             2,
     209            2 :             "work",
     210            2 :             &CancellationToken::new(),
     211            2 :         )
     212            2 :         .await
     213            2 :         .expect("was not cancellation")
     214            2 :         .expect_err("it was permanent error");
     215            2 : 
     216            2 :         assert_eq!(*count.lock().await, 1);
     217            2 :     }
     218              : }
        

Generated by: LCOV version 2.1-beta