Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::config::NodeMetadata;
5 : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
6 : use pageserver_api::shard::TenantShardId;
7 : use pageserver_api::upcall_api::{
8 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
9 : ValidateRequestTenant, ValidateResponse,
10 : };
11 : use reqwest::Certificate;
12 : use serde::Serialize;
13 : use serde::de::DeserializeOwned;
14 : use tokio_util::sync::CancellationToken;
15 : use url::Url;
16 : use utils::generation::Generation;
17 : use utils::id::NodeId;
18 : use utils::{backoff, failpoint_support};
19 :
20 : use crate::config::PageServerConf;
21 : use crate::virtual_file::on_fatal_io_error;
22 :
23 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
24 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
25 : pub struct StorageControllerUpcallClient {
26 : http_client: reqwest::Client,
27 : base_url: Url,
28 : node_id: NodeId,
29 : cancel: CancellationToken,
30 : }
31 :
32 : /// Represent operations which internally retry on all errors other than
33 : /// cancellation token firing: the only way they can fail is ShuttingDown.
34 : pub enum RetryForeverError {
35 : ShuttingDown,
36 : }
37 :
38 : pub trait StorageControllerUpcallApi {
39 : fn re_attach(
40 : &self,
41 : conf: &PageServerConf,
42 : ) -> impl Future<
43 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
44 : > + Send;
45 : fn validate(
46 : &self,
47 : tenants: Vec<(TenantShardId, Generation)>,
48 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
49 : }
50 :
51 : impl StorageControllerUpcallClient {
52 : /// A None return value indicates that the input `conf` object does not have control
53 : /// plane API enabled.
54 0 : pub fn new(
55 0 : conf: &'static PageServerConf,
56 0 : cancel: &CancellationToken,
57 0 : ) -> Result<Option<Self>, reqwest::Error> {
58 0 : let mut url = match conf.control_plane_api.as_ref() {
59 0 : Some(u) => u.clone(),
60 0 : None => return Ok(None),
61 : };
62 :
63 0 : if let Ok(mut segs) = url.path_segments_mut() {
64 0 : // This ensures that `url` ends with a slash if it doesn't already.
65 0 : // That way, we can subsequently use join() to safely attach extra path elements.
66 0 : segs.pop_if_empty().push("");
67 0 : }
68 :
69 0 : let mut client = reqwest::ClientBuilder::new();
70 :
71 0 : if let Some(jwt) = &conf.control_plane_api_token {
72 0 : let mut headers = reqwest::header::HeaderMap::new();
73 0 : headers.insert(
74 0 : "Authorization",
75 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
76 0 : );
77 0 : client = client.default_headers(headers);
78 0 : }
79 :
80 0 : for cert in &conf.ssl_ca_certs {
81 0 : client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
82 : }
83 :
84 : Ok(Some(Self {
85 0 : http_client: client.build()?,
86 0 : base_url: url,
87 0 : node_id: conf.id,
88 0 : cancel: cancel.clone(),
89 : }))
90 0 : }
91 :
92 : #[tracing::instrument(skip_all)]
93 : async fn retry_http_forever<R, T>(
94 : &self,
95 : url: &url::Url,
96 : request: R,
97 : ) -> Result<T, RetryForeverError>
98 : where
99 : R: Serialize,
100 : T: DeserializeOwned,
101 : {
102 : let res = backoff::retry(
103 0 : || async {
104 0 : let response = self
105 0 : .http_client
106 0 : .post(url.clone())
107 0 : .json(&request)
108 0 : .send()
109 0 : .await?;
110 :
111 0 : response.error_for_status_ref()?;
112 0 : response.json::<T>().await
113 0 : },
114 0 : |_| false,
115 : 3,
116 : u32::MAX,
117 : "storage controller upcall",
118 : &self.cancel,
119 : )
120 : .await
121 : .ok_or(RetryForeverError::ShuttingDown)?
122 : .expect("We retry forever, this should never be reached");
123 :
124 : Ok(res)
125 : }
126 :
127 0 : pub(crate) fn base_url(&self) -> &Url {
128 0 : &self.base_url
129 0 : }
130 : }
131 :
132 : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
133 : /// Block until we get a successful response, or error out if we are shut down
134 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
135 : async fn re_attach(
136 : &self,
137 : conf: &PageServerConf,
138 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
139 : let url = self
140 : .base_url
141 : .join("re-attach")
142 : .expect("Failed to build re-attach path");
143 :
144 : // Include registration content in the re-attach request if a metadata file is readable
145 : let metadata_path = conf.metadata_path();
146 : let register = match tokio::fs::read_to_string(&metadata_path).await {
147 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
148 : Ok(m) => {
149 : // Since we run one time at startup, be generous in our logging and
150 : // dump all metadata.
151 : tracing::info!(
152 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
153 : m.postgres_host,
154 : m.postgres_port,
155 : m.http_host,
156 : m.http_port,
157 : m.other
158 : );
159 :
160 : let az_id = {
161 : let az_id_from_metadata = m
162 : .other
163 : .get("availability_zone_id")
164 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
165 :
166 : match az_id_from_metadata {
167 : Some(az_id) => Some(AvailabilityZone(az_id)),
168 : None => {
169 : tracing::warn!(
170 : "metadata.json does not contain an 'availability_zone_id' field"
171 : );
172 : conf.availability_zone.clone().map(AvailabilityZone)
173 : }
174 : }
175 : };
176 :
177 : if az_id.is_none() {
178 : panic!(
179 : "Availablity zone id could not be inferred from metadata.json or pageserver config"
180 : );
181 : }
182 :
183 : Some(NodeRegisterRequest {
184 : node_id: conf.id,
185 : listen_pg_addr: m.postgres_host,
186 : listen_pg_port: m.postgres_port,
187 : listen_http_addr: m.http_host,
188 : listen_http_port: m.http_port,
189 : listen_https_port: m.https_port,
190 : availability_zone_id: az_id.expect("Checked above"),
191 : })
192 : }
193 : Err(e) => {
194 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
195 : None
196 : }
197 : },
198 : Err(e) => {
199 : if e.kind() == std::io::ErrorKind::NotFound {
200 : // This is legal: we may have been deployed with some external script
201 : // doing registration for us.
202 : tracing::info!("Metadata file not found at {metadata_path}");
203 : } else {
204 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
205 : }
206 : None
207 : }
208 : };
209 :
210 : let request = ReAttachRequest {
211 : node_id: self.node_id,
212 : register: register.clone(),
213 : };
214 :
215 : let response: ReAttachResponse = self.retry_http_forever(&url, request).await?;
216 : tracing::info!(
217 : "Received re-attach response with {} tenants (node {}, register: {:?})",
218 : response.tenants.len(),
219 : self.node_id,
220 : register,
221 : );
222 :
223 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
224 :
225 : Ok(response
226 : .tenants
227 : .into_iter()
228 0 : .map(|rart| (rart.id, rart))
229 : .collect::<HashMap<_, _>>())
230 : }
231 :
232 : /// Block until we get a successful response, or error out if we are shut down
233 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
234 : async fn validate(
235 : &self,
236 : tenants: Vec<(TenantShardId, Generation)>,
237 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
238 : let url = self
239 : .base_url
240 : .join("validate")
241 : .expect("Failed to build validate path");
242 :
243 : // When sending validate requests, break them up into chunks so that we
244 : // avoid possible edge cases of generating any HTTP requests that
245 : // require database I/O across many thousands of tenants.
246 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
247 : for tenant_chunk in (tenants).chunks(128) {
248 : let request = ValidateRequest {
249 : tenants: tenant_chunk
250 : .iter()
251 0 : .map(|(id, generation)| ValidateRequestTenant {
252 0 : id: *id,
253 0 : r#gen: (*generation).into().expect(
254 0 : "Generation should always be valid for a Tenant doing deletions",
255 0 : ),
256 0 : })
257 : .collect(),
258 : };
259 :
260 : failpoint_support::sleep_millis_async!(
261 : "control-plane-client-validate-sleep",
262 : &self.cancel
263 : );
264 : if self.cancel.is_cancelled() {
265 : return Err(RetryForeverError::ShuttingDown);
266 : }
267 :
268 : let response: ValidateResponse = self.retry_http_forever(&url, request).await?;
269 : for rt in response.tenants {
270 : result.insert(rt.id, rt.valid);
271 : }
272 : }
273 :
274 : Ok(result.into_iter().collect())
275 : }
276 : }
|