Line data Source code
1 : use std::time::Duration;
2 :
3 : use tokio::time::Instant;
4 : use tracing::{error, info, warn};
5 :
6 : use utils::backoff::exponential_backoff_duration;
7 :
8 : /// A retry handler for Pageserver gRPC requests.
9 : ///
10 : /// This is used instead of backoff::retry for better control and observability.
11 : pub struct Retry;
12 :
13 : impl Retry {
14 : /// The per-request timeout.
15 : // TODO: tune these, and/or make them configurable. Should we retry forever?
16 : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
17 : /// The total timeout across all attempts
18 : const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
19 : /// The initial backoff duration.
20 : const BASE_BACKOFF: Duration = Duration::from_millis(10);
21 : /// The maximum backoff duration.
22 : const MAX_BACKOFF: Duration = Duration::from_secs(10);
23 : /// If true, log successful requests. For debugging.
24 : const LOG_SUCCESS: bool = false;
25 :
26 : /// Runs the given async closure with timeouts and retries (exponential backoff), passing the
27 : /// attempt number starting at 0. Logs errors, using the current tracing span for context.
28 : ///
29 : /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
30 : /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
31 0 : pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
32 0 : where
33 0 : F: FnMut(usize) -> O, // takes attempt number, starting at 0
34 0 : O: Future<Output = tonic::Result<T>>,
35 0 : {
36 0 : let started = Instant::now();
37 0 : let deadline = started + Self::TOTAL_TIMEOUT;
38 0 : let mut last_error = None;
39 0 : let mut retries = 0;
40 : loop {
41 : // Set up a future to wait for the backoff (if any) and run the request with a timeout.
42 0 : let backoff_and_try = async {
43 : // NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
44 : // https://github.com/tokio-rs/tokio/issues/6866
45 0 : if let Some(backoff) = Self::backoff_duration(retries) {
46 0 : tokio::time::sleep(backoff).await;
47 0 : }
48 :
49 0 : let request_started = Instant::now();
50 0 : tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
51 0 : .await
52 0 : .map_err(|_| {
53 0 : tonic::Status::deadline_exceeded(format!(
54 0 : "request timed out after {:.3}s",
55 0 : request_started.elapsed().as_secs_f64()
56 : ))
57 0 : })?
58 0 : };
59 :
60 : // Wait for the backoff and request, or bail out if the total timeout is exceeded.
61 0 : let result = tokio::select! {
62 0 : result = backoff_and_try => result,
63 :
64 0 : _ = tokio::time::sleep_until(deadline) => {
65 0 : let last_error = last_error.unwrap_or_else(|| {
66 0 : tonic::Status::deadline_exceeded(format!(
67 0 : "request timed out after {:.3}s",
68 0 : started.elapsed().as_secs_f64()
69 : ))
70 0 : });
71 0 : error!(
72 0 : "giving up after {:.3}s and {retries} retries, last error {:?}: {}",
73 0 : started.elapsed().as_secs_f64(), last_error.code(), last_error.message(),
74 : );
75 0 : return Err(last_error);
76 : }
77 : };
78 :
79 0 : match result {
80 : // Success, return the result.
81 0 : Ok(result) => {
82 0 : if retries > 0 || Self::LOG_SUCCESS {
83 0 : info!(
84 0 : "request succeeded after {retries} retries in {:.3}s",
85 0 : started.elapsed().as_secs_f64(),
86 : );
87 0 : }
88 :
89 0 : return Ok(result);
90 : }
91 :
92 : // Error, retry or bail out.
93 0 : Err(status) => {
94 0 : let (code, message) = (status.code(), status.message());
95 0 : let attempt = retries + 1;
96 :
97 0 : if !Self::should_retry(code) {
98 : // NB: include the attempt here too. This isn't necessarily the first
99 : // attempt, because the error may change between attempts.
100 0 : error!(
101 0 : "request failed with {code:?}: {message}, not retrying (attempt {attempt})"
102 : );
103 0 : return Err(status);
104 0 : }
105 :
106 0 : warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})");
107 :
108 0 : retries += 1;
109 0 : last_error = Some(status);
110 : }
111 : }
112 : }
113 0 : }
114 :
115 : /// Returns the backoff duration for the given retry attempt, or None for no backoff.
116 0 : fn backoff_duration(retry: usize) -> Option<Duration> {
117 0 : let backoff = exponential_backoff_duration(
118 0 : retry as u32,
119 0 : Self::BASE_BACKOFF.as_secs_f64(),
120 0 : Self::MAX_BACKOFF.as_secs_f64(),
121 : );
122 0 : (!backoff.is_zero()).then_some(backoff)
123 0 : }
124 :
125 : /// Returns true if the given status code should be retries.
126 0 : fn should_retry(code: tonic::Code) -> bool {
127 0 : match code {
128 0 : tonic::Code::Ok => panic!("unexpected Ok status code"),
129 :
130 : // These codes are transient, so retry them.
131 0 : tonic::Code::Aborted => true,
132 0 : tonic::Code::Cancelled => true,
133 0 : tonic::Code::DeadlineExceeded => true, // maybe transient slowness
134 0 : tonic::Code::ResourceExhausted => true,
135 0 : tonic::Code::Unavailable => true,
136 :
137 : // The following codes will like continue to fail, so don't retry.
138 0 : tonic::Code::AlreadyExists => false,
139 0 : tonic::Code::DataLoss => false,
140 0 : tonic::Code::FailedPrecondition => false,
141 : // NB: don't retry Internal. It is intended for serious errors such as invariant
142 : // violations, and is also used for client-side invariant checks that would otherwise
143 : // result in retry loops.
144 0 : tonic::Code::Internal => false,
145 0 : tonic::Code::InvalidArgument => false,
146 0 : tonic::Code::NotFound => false,
147 0 : tonic::Code::OutOfRange => false,
148 0 : tonic::Code::PermissionDenied => false,
149 0 : tonic::Code::Unauthenticated => false,
150 0 : tonic::Code::Unimplemented => false,
151 0 : tonic::Code::Unknown => false,
152 : }
153 0 : }
154 : }
|