Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::config::NodeMetadata;
5 : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
6 : use pageserver_api::models::ShardImportStatus;
7 : use pageserver_api::shard::TenantShardId;
8 : use pageserver_api::upcall_api::{
9 : PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
10 : TimelineImportStatusRequest, ValidateRequest, ValidateRequestTenant, ValidateResponse,
11 : };
12 : use reqwest::Certificate;
13 : use serde::Serialize;
14 : use serde::de::DeserializeOwned;
15 : use tokio_util::sync::CancellationToken;
16 : use url::Url;
17 : use utils::generation::Generation;
18 : use utils::id::{NodeId, TimelineId};
19 : use utils::{backoff, failpoint_support};
20 :
21 : use crate::config::PageServerConf;
22 : use crate::virtual_file::on_fatal_io_error;
23 :
24 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
25 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
26 : pub struct StorageControllerUpcallClient {
27 : http_client: reqwest::Client,
28 : base_url: Url,
29 : node_id: NodeId,
30 : cancel: CancellationToken,
31 : }
32 :
33 : /// Represent operations which internally retry on all errors other than
34 : /// cancellation token firing: the only way they can fail is ShuttingDown.
35 : pub enum RetryForeverError {
36 : ShuttingDown,
37 : }
38 :
39 : pub trait StorageControllerUpcallApi {
40 : fn re_attach(
41 : &self,
42 : conf: &PageServerConf,
43 : ) -> impl Future<
44 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
45 : > + Send;
46 : fn validate(
47 : &self,
48 : tenants: Vec<(TenantShardId, Generation)>,
49 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
50 : fn put_timeline_import_status(
51 : &self,
52 : tenant_shard_id: TenantShardId,
53 : timeline_id: TimelineId,
54 : generation: Generation,
55 : status: ShardImportStatus,
56 : ) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
57 : fn get_timeline_import_status(
58 : &self,
59 : tenant_shard_id: TenantShardId,
60 : timeline_id: TimelineId,
61 : generation: Generation,
62 : ) -> impl Future<Output = Result<ShardImportStatus, RetryForeverError>> + Send;
63 : }
64 :
65 : impl StorageControllerUpcallClient {
66 : /// A None return value indicates that the input `conf` object does not have control
67 : /// plane API enabled.
68 0 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
69 0 : let mut url = conf.control_plane_api.clone();
70 :
71 0 : if let Ok(mut segs) = url.path_segments_mut() {
72 0 : // This ensures that `url` ends with a slash if it doesn't already.
73 0 : // That way, we can subsequently use join() to safely attach extra path elements.
74 0 : segs.pop_if_empty().push("");
75 0 : }
76 :
77 0 : let mut client = reqwest::ClientBuilder::new();
78 :
79 0 : if let Some(jwt) = &conf.control_plane_api_token {
80 0 : let mut headers = reqwest::header::HeaderMap::new();
81 0 : headers.insert(
82 0 : "Authorization",
83 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
84 0 : );
85 0 : client = client.default_headers(headers);
86 0 : }
87 :
88 0 : for cert in &conf.ssl_ca_certs {
89 0 : client = client.add_root_certificate(
90 0 : Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
91 0 : );
92 0 : }
93 :
94 0 : Self {
95 0 : http_client: client.build().expect("Failed to construct HTTP client"),
96 0 : base_url: url,
97 0 : node_id: conf.id,
98 0 : cancel: cancel.clone(),
99 0 : }
100 0 : }
101 :
102 : #[tracing::instrument(skip_all)]
103 : async fn retry_http_forever<R, T>(
104 : &self,
105 : url: &url::Url,
106 : request: R,
107 : method: reqwest::Method,
108 : ) -> Result<T, RetryForeverError>
109 : where
110 : R: Serialize,
111 : T: DeserializeOwned,
112 : {
113 : let res = backoff::retry(
114 0 : || async {
115 0 : let response = self
116 0 : .http_client
117 0 : .request(method.clone(), url.clone())
118 0 : .json(&request)
119 0 : .send()
120 0 : .await?;
121 :
122 0 : response.error_for_status_ref()?;
123 0 : response.json::<T>().await
124 0 : },
125 0 : |_| false,
126 : 3,
127 : u32::MAX,
128 : "storage controller upcall",
129 : &self.cancel,
130 : )
131 : .await
132 : .ok_or(RetryForeverError::ShuttingDown)?
133 : .expect("We retry forever, this should never be reached");
134 :
135 : Ok(res)
136 : }
137 :
138 0 : pub(crate) fn base_url(&self) -> &Url {
139 0 : &self.base_url
140 0 : }
141 : }
142 :
143 : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
144 : /// Block until we get a successful response, or error out if we are shut down
145 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
146 : async fn re_attach(
147 : &self,
148 : conf: &PageServerConf,
149 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
150 : let url = self
151 : .base_url
152 : .join("re-attach")
153 : .expect("Failed to build re-attach path");
154 :
155 : // Include registration content in the re-attach request if a metadata file is readable
156 : let metadata_path = conf.metadata_path();
157 : let register = match tokio::fs::read_to_string(&metadata_path).await {
158 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
159 : Ok(m) => {
160 : // Since we run one time at startup, be generous in our logging and
161 : // dump all metadata.
162 : tracing::info!(
163 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
164 : m.postgres_host,
165 : m.postgres_port,
166 : m.http_host,
167 : m.http_port,
168 : m.other
169 : );
170 :
171 : let az_id = {
172 : let az_id_from_metadata = m
173 : .other
174 : .get("availability_zone_id")
175 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
176 :
177 : match az_id_from_metadata {
178 : Some(az_id) => Some(AvailabilityZone(az_id)),
179 : None => {
180 : tracing::warn!(
181 : "metadata.json does not contain an 'availability_zone_id' field"
182 : );
183 : conf.availability_zone.clone().map(AvailabilityZone)
184 : }
185 : }
186 : };
187 :
188 : if az_id.is_none() {
189 : panic!(
190 : "Availablity zone id could not be inferred from metadata.json or pageserver config"
191 : );
192 : }
193 :
194 : Some(NodeRegisterRequest {
195 : node_id: conf.id,
196 : listen_pg_addr: m.postgres_host,
197 : listen_pg_port: m.postgres_port,
198 : listen_http_addr: m.http_host,
199 : listen_http_port: m.http_port,
200 : listen_https_port: m.https_port,
201 : availability_zone_id: az_id.expect("Checked above"),
202 : })
203 : }
204 : Err(e) => {
205 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
206 : None
207 : }
208 : },
209 : Err(e) => {
210 : if e.kind() == std::io::ErrorKind::NotFound {
211 : // This is legal: we may have been deployed with some external script
212 : // doing registration for us.
213 : tracing::info!("Metadata file not found at {metadata_path}");
214 : } else {
215 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
216 : }
217 : None
218 : }
219 : };
220 :
221 : let request = ReAttachRequest {
222 : node_id: self.node_id,
223 : register: register.clone(),
224 : };
225 :
226 : let response: ReAttachResponse = self
227 : .retry_http_forever(&url, request, reqwest::Method::POST)
228 : .await?;
229 : tracing::info!(
230 : "Received re-attach response with {} tenants (node {}, register: {:?})",
231 : response.tenants.len(),
232 : self.node_id,
233 : register,
234 : );
235 :
236 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
237 :
238 : Ok(response
239 : .tenants
240 : .into_iter()
241 0 : .map(|rart| (rart.id, rart))
242 : .collect::<HashMap<_, _>>())
243 : }
244 :
245 : /// Block until we get a successful response, or error out if we are shut down
246 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
247 : async fn validate(
248 : &self,
249 : tenants: Vec<(TenantShardId, Generation)>,
250 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
251 : let url = self
252 : .base_url
253 : .join("validate")
254 : .expect("Failed to build validate path");
255 :
256 : // When sending validate requests, break them up into chunks so that we
257 : // avoid possible edge cases of generating any HTTP requests that
258 : // require database I/O across many thousands of tenants.
259 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
260 : for tenant_chunk in (tenants).chunks(128) {
261 : let request = ValidateRequest {
262 : tenants: tenant_chunk
263 : .iter()
264 0 : .map(|(id, generation)| ValidateRequestTenant {
265 0 : id: *id,
266 0 : r#gen: (*generation).into().expect(
267 0 : "Generation should always be valid for a Tenant doing deletions",
268 0 : ),
269 0 : })
270 : .collect(),
271 : };
272 :
273 : failpoint_support::sleep_millis_async!(
274 : "control-plane-client-validate-sleep",
275 : &self.cancel
276 : );
277 : if self.cancel.is_cancelled() {
278 : return Err(RetryForeverError::ShuttingDown);
279 : }
280 :
281 : let response: ValidateResponse = self
282 : .retry_http_forever(&url, request, reqwest::Method::POST)
283 : .await?;
284 : for rt in response.tenants {
285 : result.insert(rt.id, rt.valid);
286 : }
287 : }
288 :
289 : Ok(result.into_iter().collect())
290 : }
291 :
292 : /// Send a shard import status to the storage controller
293 : ///
294 : /// The implementation must have at-least-once delivery semantics.
295 : /// To this end, we retry the request until it succeeds. If the pageserver
296 : /// restarts or crashes, the shard import will start again from the beggining.
297 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
298 : async fn put_timeline_import_status(
299 : &self,
300 : tenant_shard_id: TenantShardId,
301 : timeline_id: TimelineId,
302 : generation: Generation,
303 : status: ShardImportStatus,
304 : ) -> Result<(), RetryForeverError> {
305 : let url = self
306 : .base_url
307 : .join("timeline_import_status")
308 : .expect("Failed to build path");
309 :
310 : let request = PutTimelineImportStatusRequest {
311 : tenant_shard_id,
312 : timeline_id,
313 : generation,
314 : status,
315 : };
316 :
317 : self.retry_http_forever(&url, request, reqwest::Method::POST)
318 : .await
319 : }
320 :
321 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
322 : async fn get_timeline_import_status(
323 : &self,
324 : tenant_shard_id: TenantShardId,
325 : timeline_id: TimelineId,
326 : generation: Generation,
327 : ) -> Result<ShardImportStatus, RetryForeverError> {
328 : let url = self
329 : .base_url
330 : .join("timeline_import_status")
331 : .expect("Failed to build path");
332 :
333 : let request = TimelineImportStatusRequest {
334 : tenant_shard_id,
335 : timeline_id,
336 : generation,
337 : };
338 :
339 : let response: ShardImportStatus = self
340 : .retry_http_forever(&url, request, reqwest::Method::GET)
341 : .await?;
342 : Ok(response)
343 : }
344 : }
|