Line data Source code
1 : use crate::tenant_shard::ObservedState;
2 : use pageserver_api::shard::TenantShardId;
3 : use serde::{Deserialize, Serialize};
4 : use std::{collections::HashMap, time::Duration};
5 : use tokio_util::sync::CancellationToken;
6 :
7 : use hyper::Uri;
8 : use reqwest::{StatusCode, Url};
9 : use utils::{backoff, http::error::HttpErrorBody};
10 :
11 : #[derive(Debug, Clone)]
12 : pub(crate) struct PeerClient {
13 : uri: Uri,
14 : jwt: Option<String>,
15 : client: reqwest::Client,
16 : }
17 :
18 0 : #[derive(thiserror::Error, Debug)]
19 : pub(crate) enum StorageControllerPeerError {
20 : #[error("failed to deserialize error response with status code {0} at {1}: {2}")]
21 : DeserializationError(StatusCode, Url, reqwest::Error),
22 : #[error("storage controller peer API error ({0}): {1}")]
23 : ApiError(StatusCode, String),
24 : #[error("failed to send HTTP request: {0}")]
25 : SendError(reqwest::Error),
26 : #[error("Cancelled")]
27 : Cancelled,
28 : }
29 :
30 : pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
31 :
32 : pub(crate) trait ResponseErrorMessageExt: Sized {
33 : fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
34 : }
35 :
36 : impl ResponseErrorMessageExt for reqwest::Response {
37 0 : async fn error_from_body(self) -> Result<Self> {
38 0 : let status = self.status();
39 0 : if !(status.is_client_error() || status.is_server_error()) {
40 0 : return Ok(self);
41 0 : }
42 0 :
43 0 : let url = self.url().to_owned();
44 0 : Err(match self.json::<HttpErrorBody>().await {
45 0 : Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
46 0 : Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
47 : })
48 0 : }
49 : }
50 :
51 0 : #[derive(Serialize, Deserialize, Debug, Default)]
52 : pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
53 :
54 : impl PeerClient {
55 0 : pub(crate) fn new(uri: Uri, jwt: Option<String>) -> Self {
56 0 : Self {
57 0 : uri,
58 0 : jwt,
59 0 : client: reqwest::Client::new(),
60 0 : }
61 0 : }
62 :
63 0 : async fn request_step_down(&self) -> Result<GlobalObservedState> {
64 0 : let step_down_path = format!("{}control/v1/step_down", self.uri);
65 0 : let req = self.client.put(step_down_path);
66 0 : let req = if let Some(jwt) = &self.jwt {
67 0 : req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
68 : } else {
69 0 : req
70 : };
71 :
72 0 : let req = req.timeout(Duration::from_secs(2));
73 :
74 0 : let res = req
75 0 : .send()
76 0 : .await
77 0 : .map_err(StorageControllerPeerError::SendError)?;
78 0 : let response = res.error_from_body().await?;
79 :
80 0 : let status = response.status();
81 0 : let url = response.url().to_owned();
82 0 :
83 0 : response
84 0 : .json()
85 0 : .await
86 0 : .map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
87 0 : }
88 :
89 : /// Request the peer to step down and return its current observed state
90 : /// All errors are retried with exponential backoff for a maximum of 4 attempts.
91 : /// Assuming all retries are performed, the function times out after roughly 4 seconds.
92 0 : pub(crate) async fn step_down(
93 0 : &self,
94 0 : cancel: &CancellationToken,
95 0 : ) -> Result<GlobalObservedState> {
96 0 : backoff::retry(
97 0 : || self.request_step_down(),
98 0 : |_e| false,
99 0 : 2,
100 0 : 4,
101 0 : "Send step down request",
102 0 : cancel,
103 0 : )
104 0 : .await
105 0 : .ok_or_else(|| StorageControllerPeerError::Cancelled)
106 0 : .and_then(|x| x)
107 0 : }
108 : }
|