TLA Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use pageserver_api::control_api::{
4 : ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
5 : };
6 : use serde::{de::DeserializeOwned, Serialize};
7 : use tokio_util::sync::CancellationToken;
8 : use url::Url;
9 : use utils::{
10 : backoff,
11 : generation::Generation,
12 : id::{NodeId, TenantId},
13 : };
14 :
15 : use crate::config::PageServerConf;
16 :
17 : /// The Pageserver's client for using the control plane API: this is a small subset
18 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
19 : pub struct ControlPlaneClient {
20 : http_client: reqwest::Client,
21 : base_url: Url,
22 : node_id: NodeId,
23 : cancel: CancellationToken,
24 : }
25 :
26 : /// Represent operations which internally retry on all errors other than
27 : /// cancellation token firing: the only way they can fail is ShuttingDown.
28 : pub enum RetryForeverError {
29 : ShuttingDown,
30 : }
31 :
32 : #[async_trait::async_trait]
33 : pub trait ControlPlaneGenerationsApi {
34 : async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError>;
35 : async fn validate(
36 : &self,
37 : tenants: Vec<(TenantId, Generation)>,
38 : ) -> Result<HashMap<TenantId, bool>, RetryForeverError>;
39 : }
40 :
41 : impl ControlPlaneClient {
42 : /// A None return value indicates that the input `conf` object does not have control
43 : /// plane API enabled.
44 CBC 1119 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
45 1119 : let mut url = match conf.control_plane_api.as_ref() {
46 61 : Some(u) => u.clone(),
47 1058 : None => return None,
48 : };
49 :
50 61 : if let Ok(mut segs) = url.path_segments_mut() {
51 61 : // This ensures that `url` ends with a slash if it doesn't already.
52 61 : // That way, we can subsequently use join() to safely attach extra path elements.
53 61 : segs.pop_if_empty().push("");
54 61 : }
55 :
56 61 : let mut client = reqwest::ClientBuilder::new();
57 :
58 61 : if let Some(jwt) = &conf.control_plane_api_token {
59 UBC 0 : let mut headers = hyper::HeaderMap::new();
60 0 : headers.insert("Authorization", jwt.get_contents().parse().unwrap());
61 0 : client = client.default_headers(headers);
62 CBC 61 : }
63 :
64 61 : Some(Self {
65 61 : http_client: client.build().expect("Failed to construct HTTP client"),
66 61 : base_url: url,
67 61 : node_id: conf.id,
68 61 : cancel: cancel.clone(),
69 61 : })
70 1119 : }
71 :
72 40 : async fn retry_http_forever<R, T>(
73 40 : &self,
74 40 : url: &url::Url,
75 40 : request: R,
76 40 : ) -> Result<T, RetryForeverError>
77 40 : where
78 40 : R: Serialize,
79 40 : T: DeserializeOwned,
80 40 : {
81 40 : #[derive(thiserror::Error, Debug)]
82 40 : enum RemoteAttemptError {
83 40 : #[error("shutdown")]
84 40 : Shutdown,
85 40 : #[error("remote: {0}")]
86 40 : Remote(reqwest::Error),
87 40 : }
88 40 :
89 40 : match backoff::retry(
90 48 : || async {
91 48 : let response = self
92 48 : .http_client
93 48 : .post(url.clone())
94 48 : .json(&request)
95 48 : .send()
96 120 : .await
97 48 : .map_err(RemoteAttemptError::Remote)?;
98 :
99 39 : response
100 39 : .error_for_status_ref()
101 39 : .map_err(RemoteAttemptError::Remote)?;
102 39 : response
103 39 : .json::<T>()
104 UBC 0 : .await
105 CBC 39 : .map_err(RemoteAttemptError::Remote)
106 48 : },
107 40 : |_| false,
108 40 : 3,
109 40 : u32::MAX,
110 40 : "calling control plane generation validation API",
111 40 : backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
112 40 : )
113 129 : .await
114 : {
115 1 : Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown),
116 : Err(RemoteAttemptError::Remote(_)) => {
117 UBC 0 : panic!("We retry forever, this should never be reached");
118 : }
119 CBC 39 : Ok(r) => Ok(r),
120 : }
121 40 : }
122 : }
123 :
124 : #[async_trait::async_trait]
125 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
126 : /// Block until we get a successful response, or error out if we are shut down
127 30 : async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
128 30 : let re_attach_path = self
129 30 : .base_url
130 30 : .join("re-attach")
131 30 : .expect("Failed to build re-attach path");
132 30 : let request = ReAttachRequest {
133 30 : node_id: self.node_id,
134 30 : };
135 30 :
136 30 : fail::fail_point!("control-plane-client-re-attach");
137 :
138 88 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
139 30 : tracing::info!(
140 30 : "Received re-attach response with {} tenants",
141 30 : response.tenants.len()
142 30 : );
143 :
144 30 : Ok(response
145 30 : .tenants
146 30 : .into_iter()
147 30 : .map(|t| (t.id, Generation::new(t.generation)))
148 30 : .collect::<HashMap<_, _>>())
149 60 : }
150 :
151 : /// Block until we get a successful response, or error out if we are shut down
152 10 : async fn validate(
153 10 : &self,
154 10 : tenants: Vec<(TenantId, Generation)>,
155 10 : ) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
156 10 : let re_attach_path = self
157 10 : .base_url
158 10 : .join("validate")
159 10 : .expect("Failed to build validate path");
160 10 :
161 10 : let request = ValidateRequest {
162 10 : tenants: tenants
163 10 : .into_iter()
164 10 : .map(|(id, gen)| ValidateRequestTenant {
165 10 : id,
166 10 : gen: gen
167 10 : .into()
168 10 : .expect("Generation should always be valid for a Tenant doing deletions"),
169 10 : })
170 10 : .collect(),
171 10 : };
172 10 :
173 10 : fail::fail_point!("control-plane-client-validate");
174 :
175 41 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
176 :
177 9 : Ok(response
178 9 : .tenants
179 9 : .into_iter()
180 9 : .map(|rt| (rt.id, rt.valid))
181 9 : .collect())
182 20 : }
183 : }
|