Line data Source code
1 : use crate::tenant_shard::ObservedState;
2 : use pageserver_api::shard::TenantShardId;
3 : use serde::{Deserialize, Serialize};
4 : use std::collections::HashMap;
5 : use std::error::Error as _;
6 : use std::time::Duration;
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use hyper::Uri;
10 : use reqwest::{StatusCode, Url};
11 : use utils::{backoff, http::error::HttpErrorBody};
12 :
13 : #[derive(Debug, Clone)]
14 : pub(crate) struct PeerClient {
15 : uri: Uri,
16 : jwt: Option<String>,
17 : client: reqwest::Client,
18 : }
19 :
20 : #[derive(thiserror::Error, Debug)]
21 : pub(crate) enum StorageControllerPeerError {
22 : #[error(
23 : "failed to deserialize error response with status code {0} at {1}: {2}{}",
24 0 : .2.source().map(|e| format!(": {e}")).unwrap_or_default()
25 : )]
26 : DeserializationError(StatusCode, Url, reqwest::Error),
27 : #[error("storage controller peer API error ({0}): {1}")]
28 : ApiError(StatusCode, String),
29 0 : #[error("failed to send HTTP request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
30 : SendError(reqwest::Error),
31 : #[error("Cancelled")]
32 : Cancelled,
33 : }
34 :
35 : pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
36 :
37 : pub(crate) trait ResponseErrorMessageExt: Sized {
38 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
39 : }
40 :
41 : impl ResponseErrorMessageExt for reqwest::Response {
42 0 : async fn error_from_body(self) -> Result<Self> {
43 0 : let status = self.status();
44 0 : if !(status.is_client_error() || status.is_server_error()) {
45 0 : return Ok(self);
46 0 : }
47 0 :
48 0 : let url = self.url().to_owned();
49 0 : Err(match self.json::<HttpErrorBody>().await {
50 0 : Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
51 0 : Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
52 : })
53 0 : }
54 : }
55 :
56 0 : #[derive(Serialize, Deserialize, Debug, Default)]
57 : pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
58 :
59 : impl PeerClient {
60 0 : pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
61 0 : Self {
62 0 : uri,
63 0 : jwt,
64 0 : client: reqwest::Client::new(),
65 0 : }
66 0 : }
67 :
68 0 : async fn request_step_down(&self) -> Result<GlobalObservedState> {
69 0 : let step_down_path = format!("{}control/v1/step_down", self.uri);
70 0 : let req = self.client.put(step_down_path);
71 0 : let req = if let Some(jwt) = &self.jwt {
72 0 : req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
73 : } else {
74 0 : req
75 : };
76 :
77 0 : let req = req.timeout(Duration::from_secs(2));
78 :
79 0 : let res = req
80 0 : .send()
81 0 : .await
82 0 : .map_err(StorageControllerPeerError::SendError)?;
83 0 : let response = res.error_from_body().await?;
84 :
85 0 : let status = response.status();
86 0 : let url = response.url().to_owned();
87 0 :
88 0 : response
89 0 : .json()
90 0 : .await
91 0 : .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
92 0 : }
93 :
94 : /// Request the peer to step down and return its current observed state
95 : /// All errors are retried with exponential backoff for a maximum of 4 attempts.
96 : /// Assuming all retries are performed, the function times out after roughly 4 seconds.
97 0 : pub(crate) async fn step_down(
98 0 : &self,
99 0 : cancel: &CancellationToken,
100 0 : ) -> Result<GlobalObservedState> {
101 0 : backoff::retry(
102 0 : || self.request_step_down(),
103 0 : |_e| false,
104 0 : 2,
105 0 : 4,
106 0 : "Send step down request",
107 0 : cancel,
108 0 : )
109 0 : .await
110 0 : .ok_or_else(|| StorageControllerPeerError::Cancelled)
111 0 : .and_then(|x| x)
112 0 : }
113 : }
|