Line data Source code
1 : use crate::tenant_shard::ObservedState;
2 : use pageserver_api::shard::TenantShardId;
3 : use serde::{Deserialize, Serialize};
4 : use std::collections::HashMap;
5 : use std::error::Error as _;
6 : use std::time::Duration;
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use http_utils::error::HttpErrorBody;
10 : use hyper::Uri;
11 : use reqwest::{StatusCode, Url};
12 : use utils::backoff;
13 :
14 : #[derive(Debug, Clone)]
15 : pub(crate) struct PeerClient {
16 : uri: Uri,
17 : jwt: Option<String>,
18 : client: reqwest::Client,
19 : }
20 :
21 : #[derive(thiserror::Error, Debug)]
22 : pub(crate) enum StorageControllerPeerError {
23 : #[error(
24 : "failed to deserialize error response with status code {0} at {1}: {2}{}",
25 0 : .2.source().map(|e| format!(": {e}")).unwrap_or_default()
26 : )]
27 : DeserializationError(StatusCode, Url, reqwest::Error),
28 : #[error("storage controller peer API error ({0}): {1}")]
29 : ApiError(StatusCode, String),
30 0 : #[error("failed to send HTTP request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
31 : SendError(reqwest::Error),
32 : #[error("Cancelled")]
33 : Cancelled,
34 : }
35 :
36 : pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
37 :
38 : pub(crate) trait ResponseErrorMessageExt: Sized {
39 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
40 : }
41 :
42 : impl ResponseErrorMessageExt for reqwest::Response {
43 0 : async fn error_from_body(self) -> Result<Self> {
44 0 : let status = self.status();
45 0 : if !(status.is_client_error() || status.is_server_error()) {
46 0 : return Ok(self);
47 0 : }
48 0 :
49 0 : let url = self.url().to_owned();
50 0 : Err(match self.json::<HttpErrorBody>().await {
51 0 : Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
52 0 : Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
53 : })
54 0 : }
55 : }
56 :
57 0 : #[derive(Serialize, Deserialize, Debug, Default)]
58 : pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
59 :
60 : impl PeerClient {
61 0 : pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
62 0 : Self {
63 0 : uri,
64 0 : jwt,
65 0 : client: reqwest::Client::new(),
66 0 : }
67 0 : }
68 :
69 0 : async fn request_step_down(&self) -> Result<GlobalObservedState> {
70 0 : let step_down_path = format!("{}control/v1/step_down", self.uri);
71 0 : let req = self.client.put(step_down_path);
72 0 : let req = if let Some(jwt) = &self.jwt {
73 0 : req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
74 : } else {
75 0 : req
76 : };
77 :
78 0 : let req = req.timeout(Duration::from_secs(2));
79 :
80 0 : let res = req
81 0 : .send()
82 0 : .await
83 0 : .map_err(StorageControllerPeerError::SendError)?;
84 0 : let response = res.error_from_body().await?;
85 :
86 0 : let status = response.status();
87 0 : let url = response.url().to_owned();
88 0 :
89 0 : response
90 0 : .json()
91 0 : .await
92 0 : .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
93 0 : }
94 :
95 : /// Request the peer to step down and return its current observed state
96 : /// All errors are retried with exponential backoff for a maximum of 4 attempts.
97 : /// Assuming all retries are performed, the function times out after roughly 4 seconds.
98 0 : pub(crate) async fn step_down(
99 0 : &self,
100 0 : cancel: &CancellationToken,
101 0 : ) -> Result<GlobalObservedState> {
102 0 : backoff::retry(
103 0 : || self.request_step_down(),
104 0 : |_e| false,
105 0 : 2,
106 0 : 4,
107 0 : "Send step down request",
108 0 : cancel,
109 0 : )
110 0 : .await
111 0 : .ok_or_else(|| StorageControllerPeerError::Cancelled)
112 0 : .and_then(|x| x)
113 0 : }
114 : }
|