Line data Source code
1 : use std::collections::HashMap;
2 : use std::net::IpAddr;
3 :
4 : use futures::Future;
5 : use pageserver_api::config::NodeMetadata;
6 : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
7 : use pageserver_api::models::ShardImportStatus;
8 : use pageserver_api::shard::TenantShardId;
9 : use pageserver_api::upcall_api::{
10 : PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
11 : TimelineImportStatusRequest, ValidateRequest, ValidateRequestTenant, ValidateResponse,
12 : };
13 : use reqwest::Certificate;
14 : use serde::Serialize;
15 : use serde::de::DeserializeOwned;
16 : use tokio_util::sync::CancellationToken;
17 : use url::Url;
18 : use utils::generation::Generation;
19 : use utils::id::{NodeId, TimelineId};
20 : use utils::{backoff, failpoint_support, ip_address};
21 :
22 : use crate::config::PageServerConf;
23 : use crate::virtual_file::on_fatal_io_error;
24 :
25 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
26 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
27 : pub struct StorageControllerUpcallClient {
28 : http_client: reqwest::Client,
29 : base_url: Url,
30 : node_id: NodeId,
31 : node_ip_addr: Option<IpAddr>,
32 : cancel: CancellationToken,
33 : }
34 :
35 : /// Represent operations which internally retry on all errors other than
36 : /// cancellation token firing: the only way they can fail is ShuttingDown.
37 : pub enum RetryForeverError {
38 : ShuttingDown,
39 : }
40 :
41 : pub trait StorageControllerUpcallApi {
42 : fn re_attach(
43 : &self,
44 : conf: &PageServerConf,
45 : empty_local_disk: bool,
46 : ) -> impl Future<
47 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
48 : > + Send;
49 : fn validate(
50 : &self,
51 : tenants: Vec<(TenantShardId, Generation)>,
52 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
53 : fn put_timeline_import_status(
54 : &self,
55 : tenant_shard_id: TenantShardId,
56 : timeline_id: TimelineId,
57 : generation: Generation,
58 : status: ShardImportStatus,
59 : ) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
60 : fn get_timeline_import_status(
61 : &self,
62 : tenant_shard_id: TenantShardId,
63 : timeline_id: TimelineId,
64 : generation: Generation,
65 : ) -> impl Future<Output = Result<ShardImportStatus, RetryForeverError>> + Send;
66 : }
67 :
68 : impl StorageControllerUpcallClient {
69 : /// A None return value indicates that the input `conf` object does not have control
70 : /// plane API enabled.
71 0 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
72 0 : let mut url = conf.control_plane_api.clone();
73 :
74 0 : if let Ok(mut segs) = url.path_segments_mut() {
75 0 : // This ensures that `url` ends with a slash if it doesn't already.
76 0 : // That way, we can subsequently use join() to safely attach extra path elements.
77 0 : segs.pop_if_empty().push("");
78 0 : }
79 :
80 0 : let mut client = reqwest::ClientBuilder::new();
81 :
82 0 : if let Some(jwt) = &conf.control_plane_api_token {
83 0 : let mut headers = reqwest::header::HeaderMap::new();
84 0 : headers.insert(
85 0 : "Authorization",
86 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
87 0 : );
88 0 : client = client.default_headers(headers);
89 0 : }
90 :
91 0 : for cert in &conf.ssl_ca_certs {
92 0 : client = client.add_root_certificate(
93 0 : Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
94 0 : );
95 0 : }
96 :
97 : // Intentionally panics if we encountered any errors parsing or reading the IP address.
98 : // Note that if the required environment variable is not set, `read_node_ip_addr_from_env` returns `Ok(None)`
99 : // instead of an error.
100 0 : let node_ip_addr =
101 0 : ip_address::read_node_ip_addr_from_env().expect("Error reading node IP address.");
102 :
103 0 : Self {
104 0 : http_client: client.build().expect("Failed to construct HTTP client"),
105 0 : base_url: url,
106 0 : node_id: conf.id,
107 0 : cancel: cancel.clone(),
108 0 : node_ip_addr,
109 0 : }
110 0 : }
111 :
112 : #[tracing::instrument(skip_all)]
113 : async fn retry_http_forever<R, T>(
114 : &self,
115 : url: &url::Url,
116 : request: R,
117 : method: reqwest::Method,
118 : ) -> Result<T, RetryForeverError>
119 : where
120 : R: Serialize,
121 : T: DeserializeOwned,
122 : {
123 : let res = backoff::retry(
124 0 : || async {
125 0 : let response = self
126 0 : .http_client
127 0 : .request(method.clone(), url.clone())
128 0 : .json(&request)
129 0 : .send()
130 0 : .await?;
131 :
132 0 : response.error_for_status_ref()?;
133 0 : response.json::<T>().await
134 0 : },
135 : |_| false,
136 : 3,
137 : u32::MAX,
138 : "storage controller upcall",
139 : &self.cancel,
140 : )
141 : .await
142 : .ok_or(RetryForeverError::ShuttingDown)?
143 : .expect("We retry forever, this should never be reached");
144 :
145 : Ok(res)
146 : }
147 :
148 0 : pub(crate) fn base_url(&self) -> &Url {
149 0 : &self.base_url
150 0 : }
151 : }
152 :
153 : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
154 : /// Block until we get a successful response, or error out if we are shut down
155 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
156 : async fn re_attach(
157 : &self,
158 : conf: &PageServerConf,
159 : empty_local_disk: bool,
160 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
161 : let url = self
162 : .base_url
163 : .join("re-attach")
164 : .expect("Failed to build re-attach path");
165 :
166 : // Include registration content in the re-attach request if a metadata file is readable
167 : let metadata_path = conf.metadata_path();
168 : let register = match tokio::fs::read_to_string(&metadata_path).await {
169 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
170 : Ok(m) => {
171 : // Since we run one time at startup, be generous in our logging and
172 : // dump all metadata.
173 : tracing::info!("Loaded node metadata: {m}");
174 :
175 : let az_id = {
176 : let az_id_from_metadata = m
177 : .other
178 : .get("availability_zone_id")
179 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
180 :
181 : match az_id_from_metadata {
182 : Some(az_id) => Some(AvailabilityZone(az_id)),
183 : None => {
184 : tracing::warn!(
185 : "metadata.json does not contain an 'availability_zone_id' field"
186 : );
187 : conf.availability_zone.clone().map(AvailabilityZone)
188 : }
189 : }
190 : };
191 :
192 : if az_id.is_none() {
193 : panic!(
194 : "Availablity zone id could not be inferred from metadata.json or pageserver config"
195 : );
196 : }
197 :
198 : Some(NodeRegisterRequest {
199 : node_id: conf.id,
200 : listen_pg_addr: m.postgres_host,
201 : listen_pg_port: m.postgres_port,
202 : listen_grpc_addr: m.grpc_host,
203 : listen_grpc_port: m.grpc_port,
204 : listen_http_addr: m.http_host,
205 : listen_http_port: m.http_port,
206 : listen_https_port: m.https_port,
207 : node_ip_addr: self.node_ip_addr,
208 : availability_zone_id: az_id.expect("Checked above"),
209 : })
210 : }
211 : Err(e) => {
212 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
213 : None
214 : }
215 : },
216 : Err(e) => {
217 : if e.kind() == std::io::ErrorKind::NotFound {
218 : // This is legal: we may have been deployed with some external script
219 : // doing registration for us.
220 : tracing::info!("Metadata file not found at {metadata_path}");
221 : } else {
222 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
223 : }
224 : None
225 : }
226 : };
227 :
228 : let request = ReAttachRequest {
229 : node_id: self.node_id,
230 : register: register.clone(),
231 : empty_local_disk: Some(empty_local_disk),
232 : };
233 :
234 : let response: ReAttachResponse = self
235 : .retry_http_forever(&url, request, reqwest::Method::POST)
236 : .await?;
237 : tracing::info!(
238 : "Received re-attach response with {} tenants (node {}, register: {:?})",
239 : response.tenants.len(),
240 : self.node_id,
241 : register,
242 : );
243 :
244 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
245 :
246 : Ok(response
247 : .tenants
248 : .into_iter()
249 0 : .map(|rart| (rart.id, rart))
250 : .collect::<HashMap<_, _>>())
251 : }
252 :
253 : /// Block until we get a successful response, or error out if we are shut down
254 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
255 : async fn validate(
256 : &self,
257 : tenants: Vec<(TenantShardId, Generation)>,
258 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
259 : let url = self
260 : .base_url
261 : .join("validate")
262 : .expect("Failed to build validate path");
263 :
264 : // When sending validate requests, break them up into chunks so that we
265 : // avoid possible edge cases of generating any HTTP requests that
266 : // require database I/O across many thousands of tenants.
267 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
268 : for tenant_chunk in (tenants).chunks(128) {
269 : let request = ValidateRequest {
270 : tenants: tenant_chunk
271 : .iter()
272 : .map(|(id, generation)| ValidateRequestTenant {
273 0 : id: *id,
274 0 : r#gen: (*generation).into().expect(
275 0 : "Generation should always be valid for a Tenant doing deletions",
276 : ),
277 0 : })
278 : .collect(),
279 : };
280 :
281 : failpoint_support::sleep_millis_async!(
282 : "control-plane-client-validate-sleep",
283 : &self.cancel
284 : );
285 : if self.cancel.is_cancelled() {
286 : return Err(RetryForeverError::ShuttingDown);
287 : }
288 :
289 : let response: ValidateResponse = self
290 : .retry_http_forever(&url, request, reqwest::Method::POST)
291 : .await?;
292 : for rt in response.tenants {
293 : result.insert(rt.id, rt.valid);
294 : }
295 : }
296 :
297 : Ok(result.into_iter().collect())
298 : }
299 :
300 : /// Send a shard import status to the storage controller
301 : ///
302 : /// The implementation must have at-least-once delivery semantics.
303 : /// To this end, we retry the request until it succeeds. If the pageserver
304 : /// restarts or crashes, the shard import will start again from the beggining.
305 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
306 : async fn put_timeline_import_status(
307 : &self,
308 : tenant_shard_id: TenantShardId,
309 : timeline_id: TimelineId,
310 : generation: Generation,
311 : status: ShardImportStatus,
312 : ) -> Result<(), RetryForeverError> {
313 : let url = self
314 : .base_url
315 : .join("timeline_import_status")
316 : .expect("Failed to build path");
317 :
318 : let request = PutTimelineImportStatusRequest {
319 : tenant_shard_id,
320 : timeline_id,
321 : generation,
322 : status,
323 : };
324 :
325 : self.retry_http_forever(&url, request, reqwest::Method::POST)
326 : .await
327 : }
328 :
329 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
330 : async fn get_timeline_import_status(
331 : &self,
332 : tenant_shard_id: TenantShardId,
333 : timeline_id: TimelineId,
334 : generation: Generation,
335 : ) -> Result<ShardImportStatus, RetryForeverError> {
336 : let url = self
337 : .base_url
338 : .join("timeline_import_status")
339 : .expect("Failed to build path");
340 :
341 : let request = TimelineImportStatusRequest {
342 : tenant_shard_id,
343 : timeline_id,
344 : generation,
345 : };
346 :
347 : let response: ShardImportStatus = self
348 : .retry_http_forever(&url, request, reqwest::Method::GET)
349 : .await?;
350 : Ok(response)
351 : }
352 : }
|