Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::config::NodeMetadata;
5 : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
6 : use pageserver_api::models::ShardImportStatus;
7 : use pageserver_api::shard::TenantShardId;
8 : use pageserver_api::upcall_api::{
9 : PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
10 : ValidateRequest, ValidateRequestTenant, ValidateResponse,
11 : };
12 : use reqwest::Certificate;
13 : use serde::Serialize;
14 : use serde::de::DeserializeOwned;
15 : use tokio_util::sync::CancellationToken;
16 : use url::Url;
17 : use utils::generation::Generation;
18 : use utils::id::{NodeId, TimelineId};
19 : use utils::{backoff, failpoint_support};
20 :
21 : use crate::config::PageServerConf;
22 : use crate::virtual_file::on_fatal_io_error;
23 :
24 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
25 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
26 : pub struct StorageControllerUpcallClient {
27 : http_client: reqwest::Client,
28 : base_url: Url,
29 : node_id: NodeId,
30 : cancel: CancellationToken,
31 : }
32 :
33 : /// Represent operations which internally retry on all errors other than
34 : /// cancellation token firing: the only way they can fail is ShuttingDown.
35 : pub enum RetryForeverError {
36 : ShuttingDown,
37 : }
38 :
39 : pub trait StorageControllerUpcallApi {
40 : fn re_attach(
41 : &self,
42 : conf: &PageServerConf,
43 : ) -> impl Future<
44 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
45 : > + Send;
46 : fn validate(
47 : &self,
48 : tenants: Vec<(TenantShardId, Generation)>,
49 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
50 : fn put_timeline_import_status(
51 : &self,
52 : tenant_shard_id: TenantShardId,
53 : timeline_id: TimelineId,
54 : status: ShardImportStatus,
55 : ) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
56 : }
57 :
58 : impl StorageControllerUpcallClient {
59 : /// A None return value indicates that the input `conf` object does not have control
60 : /// plane API enabled.
61 0 : pub fn new(
62 0 : conf: &'static PageServerConf,
63 0 : cancel: &CancellationToken,
64 0 : ) -> Result<Option<Self>, reqwest::Error> {
65 0 : let mut url = match conf.control_plane_api.as_ref() {
66 0 : Some(u) => u.clone(),
67 0 : None => return Ok(None),
68 : };
69 :
70 0 : if let Ok(mut segs) = url.path_segments_mut() {
71 0 : // This ensures that `url` ends with a slash if it doesn't already.
72 0 : // That way, we can subsequently use join() to safely attach extra path elements.
73 0 : segs.pop_if_empty().push("");
74 0 : }
75 :
76 0 : let mut client = reqwest::ClientBuilder::new();
77 :
78 0 : if let Some(jwt) = &conf.control_plane_api_token {
79 0 : let mut headers = reqwest::header::HeaderMap::new();
80 0 : headers.insert(
81 0 : "Authorization",
82 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
83 0 : );
84 0 : client = client.default_headers(headers);
85 0 : }
86 :
87 0 : for cert in &conf.ssl_ca_certs {
88 0 : client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
89 : }
90 :
91 : Ok(Some(Self {
92 0 : http_client: client.build()?,
93 0 : base_url: url,
94 0 : node_id: conf.id,
95 0 : cancel: cancel.clone(),
96 : }))
97 0 : }
98 :
99 : #[tracing::instrument(skip_all)]
100 : async fn retry_http_forever<R, T>(
101 : &self,
102 : url: &url::Url,
103 : request: R,
104 : ) -> Result<T, RetryForeverError>
105 : where
106 : R: Serialize,
107 : T: DeserializeOwned,
108 : {
109 : let res = backoff::retry(
110 0 : || async {
111 0 : let response = self
112 0 : .http_client
113 0 : .post(url.clone())
114 0 : .json(&request)
115 0 : .send()
116 0 : .await?;
117 :
118 0 : response.error_for_status_ref()?;
119 0 : response.json::<T>().await
120 0 : },
121 0 : |_| false,
122 : 3,
123 : u32::MAX,
124 : "storage controller upcall",
125 : &self.cancel,
126 : )
127 : .await
128 : .ok_or(RetryForeverError::ShuttingDown)?
129 : .expect("We retry forever, this should never be reached");
130 :
131 : Ok(res)
132 : }
133 :
134 0 : pub(crate) fn base_url(&self) -> &Url {
135 0 : &self.base_url
136 0 : }
137 : }
138 :
139 : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
140 : /// Block until we get a successful response, or error out if we are shut down
141 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
142 : async fn re_attach(
143 : &self,
144 : conf: &PageServerConf,
145 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
146 : let url = self
147 : .base_url
148 : .join("re-attach")
149 : .expect("Failed to build re-attach path");
150 :
151 : // Include registration content in the re-attach request if a metadata file is readable
152 : let metadata_path = conf.metadata_path();
153 : let register = match tokio::fs::read_to_string(&metadata_path).await {
154 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
155 : Ok(m) => {
156 : // Since we run one time at startup, be generous in our logging and
157 : // dump all metadata.
158 : tracing::info!(
159 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
160 : m.postgres_host,
161 : m.postgres_port,
162 : m.http_host,
163 : m.http_port,
164 : m.other
165 : );
166 :
167 : let az_id = {
168 : let az_id_from_metadata = m
169 : .other
170 : .get("availability_zone_id")
171 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
172 :
173 : match az_id_from_metadata {
174 : Some(az_id) => Some(AvailabilityZone(az_id)),
175 : None => {
176 : tracing::warn!(
177 : "metadata.json does not contain an 'availability_zone_id' field"
178 : );
179 : conf.availability_zone.clone().map(AvailabilityZone)
180 : }
181 : }
182 : };
183 :
184 : if az_id.is_none() {
185 : panic!(
186 : "Availablity zone id could not be inferred from metadata.json or pageserver config"
187 : );
188 : }
189 :
190 : Some(NodeRegisterRequest {
191 : node_id: conf.id,
192 : listen_pg_addr: m.postgres_host,
193 : listen_pg_port: m.postgres_port,
194 : listen_http_addr: m.http_host,
195 : listen_http_port: m.http_port,
196 : listen_https_port: m.https_port,
197 : availability_zone_id: az_id.expect("Checked above"),
198 : })
199 : }
200 : Err(e) => {
201 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
202 : None
203 : }
204 : },
205 : Err(e) => {
206 : if e.kind() == std::io::ErrorKind::NotFound {
207 : // This is legal: we may have been deployed with some external script
208 : // doing registration for us.
209 : tracing::info!("Metadata file not found at {metadata_path}");
210 : } else {
211 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
212 : }
213 : None
214 : }
215 : };
216 :
217 : let request = ReAttachRequest {
218 : node_id: self.node_id,
219 : register: register.clone(),
220 : };
221 :
222 : let response: ReAttachResponse = self.retry_http_forever(&url, request).await?;
223 : tracing::info!(
224 : "Received re-attach response with {} tenants (node {}, register: {:?})",
225 : response.tenants.len(),
226 : self.node_id,
227 : register,
228 : );
229 :
230 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
231 :
232 : Ok(response
233 : .tenants
234 : .into_iter()
235 0 : .map(|rart| (rart.id, rart))
236 : .collect::<HashMap<_, _>>())
237 : }
238 :
239 : /// Block until we get a successful response, or error out if we are shut down
240 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
241 : async fn validate(
242 : &self,
243 : tenants: Vec<(TenantShardId, Generation)>,
244 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
245 : let url = self
246 : .base_url
247 : .join("validate")
248 : .expect("Failed to build validate path");
249 :
250 : // When sending validate requests, break them up into chunks so that we
251 : // avoid possible edge cases of generating any HTTP requests that
252 : // require database I/O across many thousands of tenants.
253 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
254 : for tenant_chunk in (tenants).chunks(128) {
255 : let request = ValidateRequest {
256 : tenants: tenant_chunk
257 : .iter()
258 0 : .map(|(id, generation)| ValidateRequestTenant {
259 0 : id: *id,
260 0 : r#gen: (*generation).into().expect(
261 0 : "Generation should always be valid for a Tenant doing deletions",
262 0 : ),
263 0 : })
264 : .collect(),
265 : };
266 :
267 : failpoint_support::sleep_millis_async!(
268 : "control-plane-client-validate-sleep",
269 : &self.cancel
270 : );
271 : if self.cancel.is_cancelled() {
272 : return Err(RetryForeverError::ShuttingDown);
273 : }
274 :
275 : let response: ValidateResponse = self.retry_http_forever(&url, request).await?;
276 : for rt in response.tenants {
277 : result.insert(rt.id, rt.valid);
278 : }
279 : }
280 :
281 : Ok(result.into_iter().collect())
282 : }
283 :
284 : /// Send a shard import status to the storage controller
285 : ///
286 : /// The implementation must have at-least-once delivery semantics.
287 : /// To this end, we retry the request until it succeeds. If the pageserver
288 : /// restarts or crashes, the shard import will start again from the beggining.
289 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
290 : async fn put_timeline_import_status(
291 : &self,
292 : tenant_shard_id: TenantShardId,
293 : timeline_id: TimelineId,
294 : status: ShardImportStatus,
295 : ) -> Result<(), RetryForeverError> {
296 : let url = self
297 : .base_url
298 : .join("timeline_import_status")
299 : .expect("Failed to build path");
300 :
301 : let request = PutTimelineImportStatusRequest {
302 : tenant_shard_id,
303 : timeline_id,
304 : status,
305 : };
306 :
307 : self.retry_http_forever(&url, request).await
308 : }
309 : }
|