Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::{
5 : controller_api::NodeRegisterRequest,
6 : shard::TenantShardId,
7 : upcall_api::{
8 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
9 : ValidateRequestTenant, ValidateResponse,
10 : },
11 : };
12 : use serde::{de::DeserializeOwned, Serialize};
13 : use tokio_util::sync::CancellationToken;
14 : use url::Url;
15 : use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
16 :
17 : use crate::{
18 : config::{NodeMetadata, PageServerConf},
19 : virtual_file::on_fatal_io_error,
20 : };
21 :
22 : /// The Pageserver's client for using the control plane API: this is a small subset
23 : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
24 : pub struct ControlPlaneClient {
25 : http_client: reqwest::Client,
26 : base_url: Url,
27 : node_id: NodeId,
28 : cancel: CancellationToken,
29 : }
30 :
31 : /// Represent operations which internally retry on all errors other than
32 : /// cancellation token firing: the only way they can fail is ShuttingDown.
33 : pub enum RetryForeverError {
34 : ShuttingDown,
35 : }
36 :
37 : pub trait ControlPlaneGenerationsApi {
38 : fn re_attach(
39 : &self,
40 : conf: &PageServerConf,
41 : ) -> impl Future<
42 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
43 : > + Send;
44 : fn validate(
45 : &self,
46 : tenants: Vec<(TenantShardId, Generation)>,
47 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
48 : }
49 :
50 : impl ControlPlaneClient {
51 : /// A None return value indicates that the input `conf` object does not have control
52 : /// plane API enabled.
53 0 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
54 0 : let mut url = match conf.control_plane_api.as_ref() {
55 0 : Some(u) => u.clone(),
56 0 : None => return None,
57 : };
58 :
59 0 : if let Ok(mut segs) = url.path_segments_mut() {
60 0 : // This ensures that `url` ends with a slash if it doesn't already.
61 0 : // That way, we can subsequently use join() to safely attach extra path elements.
62 0 : segs.pop_if_empty().push("");
63 0 : }
64 :
65 0 : let mut client = reqwest::ClientBuilder::new();
66 :
67 0 : if let Some(jwt) = &conf.control_plane_api_token {
68 0 : let mut headers = hyper::HeaderMap::new();
69 0 : headers.insert(
70 0 : "Authorization",
71 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
72 0 : );
73 0 : client = client.default_headers(headers);
74 0 : }
75 :
76 0 : Some(Self {
77 0 : http_client: client.build().expect("Failed to construct HTTP client"),
78 0 : base_url: url,
79 0 : node_id: conf.id,
80 0 : cancel: cancel.clone(),
81 0 : })
82 0 : }
83 :
84 0 : async fn retry_http_forever<R, T>(
85 0 : &self,
86 0 : url: &url::Url,
87 0 : request: R,
88 0 : ) -> Result<T, RetryForeverError>
89 0 : where
90 0 : R: Serialize,
91 0 : T: DeserializeOwned,
92 0 : {
93 0 : let res = backoff::retry(
94 0 : || async {
95 0 : let response = self
96 0 : .http_client
97 0 : .post(url.clone())
98 0 : .json(&request)
99 0 : .send()
100 0 : .await?;
101 :
102 0 : response.error_for_status_ref()?;
103 0 : response.json::<T>().await
104 0 : },
105 0 : |_| false,
106 0 : 3,
107 0 : u32::MAX,
108 0 : "calling control plane generation validation API",
109 0 : &self.cancel,
110 0 : )
111 0 : .await
112 0 : .ok_or(RetryForeverError::ShuttingDown)?
113 0 : .expect("We retry forever, this should never be reached");
114 0 :
115 0 : Ok(res)
116 0 : }
117 : }
118 :
119 : impl ControlPlaneGenerationsApi for ControlPlaneClient {
120 : /// Block until we get a successful response, or error out if we are shut down
121 0 : async fn re_attach(
122 0 : &self,
123 0 : conf: &PageServerConf,
124 0 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
125 0 : let re_attach_path = self
126 0 : .base_url
127 0 : .join("re-attach")
128 0 : .expect("Failed to build re-attach path");
129 0 :
130 0 : // Include registration content in the re-attach request if a metadata file is readable
131 0 : let metadata_path = conf.metadata_path();
132 0 : let register = match tokio::fs::read_to_string(&metadata_path).await {
133 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
134 0 : Ok(m) => {
135 0 : // Since we run one time at startup, be generous in our logging and
136 0 : // dump all metadata.
137 0 : tracing::info!(
138 0 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
139 0 : m.postgres_host,
140 0 : m.postgres_port,
141 0 : m.http_host,
142 0 : m.http_port,
143 0 : m.other
144 0 : );
145 :
146 0 : Some(NodeRegisterRequest {
147 0 : node_id: conf.id,
148 0 : listen_pg_addr: m.postgres_host,
149 0 : listen_pg_port: m.postgres_port,
150 0 : listen_http_addr: m.http_host,
151 0 : listen_http_port: m.http_port,
152 0 : })
153 : }
154 0 : Err(e) => {
155 0 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
156 0 : None
157 : }
158 : },
159 0 : Err(e) => {
160 0 : if e.kind() == std::io::ErrorKind::NotFound {
161 : // This is legal: we may have been deployed with some external script
162 : // doing registration for us.
163 0 : tracing::info!("Metadata file not found at {metadata_path}");
164 : } else {
165 0 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
166 : }
167 0 : None
168 : }
169 : };
170 :
171 0 : let request = ReAttachRequest {
172 0 : node_id: self.node_id,
173 0 : register,
174 0 : };
175 :
176 0 : fail::fail_point!("control-plane-client-re-attach");
177 :
178 0 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
179 0 : tracing::info!(
180 0 : "Received re-attach response with {} tenants",
181 0 : response.tenants.len()
182 0 : );
183 :
184 0 : Ok(response
185 0 : .tenants
186 0 : .into_iter()
187 0 : .map(|rart| (rart.id, rart))
188 0 : .collect::<HashMap<_, _>>())
189 0 : }
190 :
191 : /// Block until we get a successful response, or error out if we are shut down
192 0 : async fn validate(
193 0 : &self,
194 0 : tenants: Vec<(TenantShardId, Generation)>,
195 0 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
196 0 : let re_attach_path = self
197 0 : .base_url
198 0 : .join("validate")
199 0 : .expect("Failed to build validate path");
200 0 :
201 0 : let request = ValidateRequest {
202 0 : tenants: tenants
203 0 : .into_iter()
204 0 : .map(|(id, gen)| ValidateRequestTenant {
205 0 : id,
206 0 : gen: gen
207 0 : .into()
208 0 : .expect("Generation should always be valid for a Tenant doing deletions"),
209 0 : })
210 0 : .collect(),
211 0 : };
212 0 :
213 0 : failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
214 0 : if self.cancel.is_cancelled() {
215 0 : return Err(RetryForeverError::ShuttingDown);
216 0 : }
217 :
218 0 : let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
219 :
220 0 : Ok(response
221 0 : .tenants
222 0 : .into_iter()
223 0 : .map(|rt| (rt.id, rt.valid))
224 0 : .collect())
225 0 : }
226 : }
|