TLA Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use pageserver_api::{
4 : control_api::{
5 : ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
6 : },
7 : shard::TenantShardId,
8 : };
9 : use serde::{de::DeserializeOwned, Serialize};
10 : use tokio_util::sync::CancellationToken;
11 : use url::Url;
12 : use utils::{backoff, generation::Generation, id::NodeId};
13 :
14 : use crate::config::PageServerConf;
15 :
16 : /// The Pageserver's client for using the control plane API: this is a small subset
17 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
18 : pub struct ControlPlaneClient {
19 : http_client: reqwest::Client,
20 : base_url: Url,
21 : node_id: NodeId,
22 : cancel: CancellationToken,
23 : }
24 :
25 : /// Represent operations which internally retry on all errors other than
26 : /// cancellation token firing: the only way they can fail is ShuttingDown.
27 : pub enum RetryForeverError {
28 : ShuttingDown,
29 : }
30 :
31 : #[async_trait::async_trait]
32 : pub trait ControlPlaneGenerationsApi {
33 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError>;
34 : async fn validate(
35 : &self,
36 : tenants: Vec<(TenantShardId, Generation)>,
37 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError>;
38 : }
39 :
40 : impl ControlPlaneClient {
41 : /// A None return value indicates that the input `conf` object does not have control
42 : /// plane API enabled.
43 CBC 1113 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
44 1113 : let mut url = match conf.control_plane_api.as_ref() {
45 1111 : Some(u) => u.clone(),
46 2 : None => return None,
47 : };
48 :
49 1111 : if let Ok(mut segs) = url.path_segments_mut() {
50 1111 : // This ensures that `url` ends with a slash if it doesn't already.
51 1111 : // That way, we can subsequently use join() to safely attach extra path elements.
52 1111 : segs.pop_if_empty().push("");
53 1111 : }
54 :
55 1111 : let mut client = reqwest::ClientBuilder::new();
56 :
57 1111 : if let Some(jwt) = &conf.control_plane_api_token {
58 UBC 0 : let mut headers = hyper::HeaderMap::new();
59 0 : headers.insert(
60 0 : "Authorization",
61 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
62 0 : );
63 0 : client = client.default_headers(headers);
64 CBC 1111 : }
65 :
66 1111 : Some(Self {
67 1111 : http_client: client.build().expect("Failed to construct HTTP client"),
68 1111 : base_url: url,
69 1111 : node_id: conf.id,
70 1111 : cancel: cancel.clone(),
71 1111 : })
72 1113 : }
73 :
74 1000 : async fn retry_http_forever<R, T>(
75 1000 : &self,
76 1000 : url: &url::Url,
77 1000 : request: R,
78 1000 : ) -> Result<T, RetryForeverError>
79 1000 : where
80 1000 : R: Serialize,
81 1000 : T: DeserializeOwned,
82 1000 : {
83 1000 : #[derive(thiserror::Error, Debug)]
84 1000 : enum RemoteAttemptError {
85 1000 : #[error("shutdown")]
86 1000 : Shutdown,
87 1000 : #[error("remote: {0}")]
88 1000 : Remote(reqwest::Error),
89 1000 : }
90 1000 :
91 1000 : match backoff::retry(
92 1005 : || async {
93 1005 : let response = self
94 1005 : .http_client
95 1005 : .post(url.clone())
96 1005 : .json(&request)
97 1005 : .send()
98 2573 : .await
99 1005 : .map_err(RemoteAttemptError::Remote)?;
100 :
101 999 : response
102 999 : .error_for_status_ref()
103 999 : .map_err(RemoteAttemptError::Remote)?;
104 999 : response
105 999 : .json::<T>()
106 66 : .await
107 999 : .map_err(RemoteAttemptError::Remote)
108 1005 : },
109 1000 : |_| false,
110 1000 : 3,
111 1000 : u32::MAX,
112 1000 : "calling control plane generation validation API",
113 1000 : backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
114 1000 : )
115 2645 : .await
116 : {
117 1 : Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown),
118 : Err(RemoteAttemptError::Remote(_)) => {
119 UBC 0 : panic!("We retry forever, this should never be reached");
120 : }
121 CBC 999 : Ok(r) => Ok(r),
122 : }
123 1000 : }
124 : }
125 :
126 : #[async_trait::async_trait]
127 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
128 : /// Block until we get a successful response, or error out if we are shut down
129 555 : async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
130 555 : let re_attach_path = self
131 555 : .base_url
132 555 : .join("re-attach")
133 555 : .expect("Failed to build re-attach path");
134 555 : let request = ReAttachRequest {
135 555 : node_id: self.node_id,
136 555 : };
137 :
138 UBC 0 : fail::fail_point!("control-plane-client-re-attach");
139 :
140 CBC 1686 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
141 555 : tracing::info!(
142 555 : "Received re-attach response with {} tenants",
143 555 : response.tenants.len()
144 555 : );
145 :
146 555 : Ok(response
147 555 : .tenants
148 555 : .into_iter()
149 555 : .map(|t| (t.id, Generation::new(t.gen)))
150 555 : .collect::<HashMap<_, _>>())
151 1110 : }
152 :
153 : /// Block until we get a successful response, or error out if we are shut down
154 447 : async fn validate(
155 447 : &self,
156 447 : tenants: Vec<(TenantShardId, Generation)>,
157 447 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
158 447 : let re_attach_path = self
159 447 : .base_url
160 447 : .join("validate")
161 447 : .expect("Failed to build validate path");
162 447 :
163 447 : let request = ValidateRequest {
164 447 : tenants: tenants
165 447 : .into_iter()
166 514 : .map(|(id, gen)| ValidateRequestTenant {
167 514 : id,
168 514 : gen: gen
169 514 : .into()
170 514 : .expect("Generation should always be valid for a Tenant doing deletions"),
171 514 : })
172 447 : .collect(),
173 447 : };
174 :
175 UBC 0 : fail::fail_point!("control-plane-client-validate");
176 :
177 CBC 959 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
178 :
179 444 : Ok(response
180 444 : .tenants
181 444 : .into_iter()
182 511 : .map(|rt| (rt.id, rt.valid))
183 444 : .collect())
184 892 : }
185 : }
|