Line data Source code
1 : use std::collections::HashMap;
2 : use std::error::Error as _;
3 : use std::time::Duration;
4 :
5 : use http_utils::error::HttpErrorBody;
6 : use hyper::Uri;
7 : use pageserver_api::shard::TenantShardId;
8 : use reqwest::{StatusCode, Url};
9 : use serde::{Deserialize, Serialize};
10 : use tokio_util::sync::CancellationToken;
11 : use utils::backoff;
12 :
13 : use crate::tenant_shard::ObservedState;
14 :
15 : #[derive(Debug, Clone)]
16 : pub(crate) struct PeerClient {
17 : uri: Uri,
18 : jwt: Option<String>,
19 : client: reqwest::Client,
20 : }
21 :
22 : #[derive(thiserror::Error, Debug)]
23 : pub(crate) enum StorageControllerPeerError {
24 : #[error(
25 : "failed to deserialize error response with status code {0} at {1}: {2}{}",
26 0 : .2.source().map(|e| format!(": {e}")).unwrap_or_default()
27 : )]
28 : DeserializationError(StatusCode, Url, reqwest::Error),
29 : #[error("storage controller peer API error ({0}): {1}")]
30 : ApiError(StatusCode, String),
31 0 : #[error("failed to send HTTP request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
32 : SendError(reqwest::Error),
33 : #[error("Cancelled")]
34 : Cancelled,
35 : }
36 :
37 : pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
38 :
39 : pub(crate) trait ResponseErrorMessageExt: Sized {
40 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
41 : }
42 :
43 : impl ResponseErrorMessageExt for reqwest::Response {
44 0 : async fn error_from_body(self) -> Result<Self> {
45 0 : let status = self.status();
46 0 : if !(status.is_client_error() || status.is_server_error()) {
47 0 : return Ok(self);
48 0 : }
49 0 :
50 0 : let url = self.url().to_owned();
51 0 : Err(match self.json::<HttpErrorBody>().await {
52 0 : Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
53 0 : Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
54 : })
55 0 : }
56 : }
57 :
58 0 : #[derive(Serialize, Deserialize, Debug, Default)]
59 : pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
60 :
61 : impl PeerClient {
62 0 : pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
63 0 : Self {
64 0 : uri,
65 0 : jwt,
66 0 : client: reqwest::Client::new(),
67 0 : }
68 0 : }
69 :
70 0 : async fn request_step_down(&self) -> Result<GlobalObservedState> {
71 0 : let step_down_path = format!("{}control/v1/step_down", self.uri);
72 0 : let req = self.client.put(step_down_path);
73 0 : let req = if let Some(jwt) = &self.jwt {
74 0 : req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
75 : } else {
76 0 : req
77 : };
78 :
79 0 : let req = req.timeout(Duration::from_secs(2));
80 :
81 0 : let res = req
82 0 : .send()
83 0 : .await
84 0 : .map_err(StorageControllerPeerError::SendError)?;
85 0 : let response = res.error_from_body().await?;
86 :
87 0 : let status = response.status();
88 0 : let url = response.url().to_owned();
89 0 :
90 0 : response
91 0 : .json()
92 0 : .await
93 0 : .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
94 0 : }
95 :
96 : /// Request the peer to step down and return its current observed state
97 : /// All errors are retried with exponential backoff for a maximum of 4 attempts.
98 : /// Assuming all retries are performed, the function times out after roughly 4 seconds.
99 0 : pub(crate) async fn step_down(
100 0 : &self,
101 0 : cancel: &CancellationToken,
102 0 : ) -> Result<GlobalObservedState> {
103 0 : backoff::retry(
104 0 : || self.request_step_down(),
105 0 : |_e| false,
106 0 : 2,
107 0 : 4,
108 0 : "Send step down request",
109 0 : cancel,
110 0 : )
111 0 : .await
112 0 : .ok_or_else(|| StorageControllerPeerError::Cancelled)
113 0 : .and_then(|x| x)
114 0 : }
115 : }
|