Line data Source code
1 : use std::collections::HashMap;
2 : use std::error::Error as _;
3 : use std::time::Duration;
4 :
5 : use http_utils::error::HttpErrorBody;
6 : use hyper::Uri;
7 : use pageserver_api::shard::TenantShardId;
8 : use reqwest::{StatusCode, Url};
9 : use serde::{Deserialize, Serialize};
10 : use tokio_util::sync::CancellationToken;
11 : use utils::backoff;
12 :
13 : use crate::tenant_shard::ObservedState;
14 :
15 : #[derive(Debug, Clone)]
16 : pub(crate) struct PeerClient {
17 : uri: Uri,
18 : jwt: Option<String>,
19 : client: reqwest::Client,
20 : }
21 :
22 : #[derive(thiserror::Error, Debug)]
23 : pub(crate) enum StorageControllerPeerError {
24 : #[error(
25 : "failed to deserialize error response with status code {0} at {1}: {2}{}",
26 0 : .2.source().map(|e| format!(": {e}")).unwrap_or_default()
27 : )]
28 : DeserializationError(StatusCode, Url, reqwest::Error),
29 : #[error("storage controller peer API error ({0}): {1}")]
30 : ApiError(StatusCode, String),
31 0 : #[error("failed to send HTTP request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
32 : SendError(reqwest::Error),
33 : #[error("Cancelled")]
34 : Cancelled,
35 : }
36 :
37 : pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
38 :
39 : pub(crate) trait ResponseErrorMessageExt: Sized {
40 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
41 : }
42 :
43 : impl ResponseErrorMessageExt for reqwest::Response {
44 0 : async fn error_from_body(self) -> Result<Self> {
45 0 : let status = self.status();
46 0 : if !(status.is_client_error() || status.is_server_error()) {
47 0 : return Ok(self);
48 0 : }
49 0 :
50 0 : let url = self.url().to_owned();
51 0 : Err(match self.json::<HttpErrorBody>().await {
52 0 : Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
53 0 : Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
54 : })
55 0 : }
56 : }
57 :
58 0 : #[derive(Serialize, Deserialize, Debug, Default, Clone)]
59 : pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
60 :
61 : const STEP_DOWN_RETRIES: u32 = 8;
62 : const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
63 :
64 : impl PeerClient {
65 0 : pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
66 0 : Self {
67 0 : uri,
68 0 : jwt,
69 0 : client: http_client,
70 0 : }
71 0 : }
72 :
73 0 : async fn request_step_down(&self) -> Result<GlobalObservedState> {
74 0 : let step_down_path = format!("{}control/v1/step_down", self.uri);
75 0 : let req = self.client.put(step_down_path);
76 0 : let req = if let Some(jwt) = &self.jwt {
77 0 : req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
78 : } else {
79 0 : req
80 : };
81 :
82 0 : let req = req.timeout(STEP_DOWN_TIMEOUT);
83 :
84 0 : let res = req
85 0 : .send()
86 0 : .await
87 0 : .map_err(StorageControllerPeerError::SendError)?;
88 0 : let response = res.error_from_body().await?;
89 :
90 0 : let status = response.status();
91 0 : let url = response.url().to_owned();
92 0 :
93 0 : response
94 0 : .json()
95 0 : .await
96 0 : .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
97 0 : }
98 :
99 : /// Request the peer to step down and return its current observed state
100 : /// All errors are re-tried
101 0 : pub(crate) async fn step_down(
102 0 : &self,
103 0 : cancel: &CancellationToken,
104 0 : ) -> Result<GlobalObservedState> {
105 0 : backoff::retry(
106 0 : || self.request_step_down(),
107 0 : |_e| false,
108 0 : 2,
109 0 : STEP_DOWN_RETRIES,
110 0 : "Send step down request",
111 0 : cancel,
112 0 : )
113 0 : .await
114 0 : .ok_or_else(|| StorageControllerPeerError::Cancelled)
115 0 : .and_then(|x| x)
116 0 : }
117 : }
|