Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::config::NodeMetadata;
5 : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
6 : use pageserver_api::shard::TenantShardId;
7 : use pageserver_api::upcall_api::{
8 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
9 : ValidateRequestTenant, ValidateResponse,
10 : };
11 : use serde::Serialize;
12 : use serde::de::DeserializeOwned;
13 : use tokio_util::sync::CancellationToken;
14 : use url::Url;
15 : use utils::generation::Generation;
16 : use utils::id::NodeId;
17 : use utils::{backoff, failpoint_support};
18 :
19 : use crate::config::PageServerConf;
20 : use crate::virtual_file::on_fatal_io_error;
21 :
22 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
23 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
24 : pub struct StorageControllerUpcallClient {
25 : http_client: reqwest::Client,
26 : base_url: Url,
27 : node_id: NodeId,
28 : cancel: CancellationToken,
29 : }
30 :
31 : /// Represent operations which internally retry on all errors other than
32 : /// cancellation token firing: the only way they can fail is ShuttingDown.
33 : pub enum RetryForeverError {
34 : ShuttingDown,
35 : }
36 :
37 : pub trait StorageControllerUpcallApi {
38 : fn re_attach(
39 : &self,
40 : conf: &PageServerConf,
41 : ) -> impl Future<
42 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
43 : > + Send;
44 : fn validate(
45 : &self,
46 : tenants: Vec<(TenantShardId, Generation)>,
47 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
48 : }
49 :
50 : impl StorageControllerUpcallClient {
51 : /// A None return value indicates that the input `conf` object does not have control
52 : /// plane API enabled.
53 0 : pub fn new(
54 0 : conf: &'static PageServerConf,
55 0 : cancel: &CancellationToken,
56 0 : ) -> Result<Option<Self>, reqwest::Error> {
57 0 : let mut url = match conf.control_plane_api.as_ref() {
58 0 : Some(u) => u.clone(),
59 0 : None => return Ok(None),
60 : };
61 :
62 0 : if let Ok(mut segs) = url.path_segments_mut() {
63 0 : // This ensures that `url` ends with a slash if it doesn't already.
64 0 : // That way, we can subsequently use join() to safely attach extra path elements.
65 0 : segs.pop_if_empty().push("");
66 0 : }
67 :
68 0 : let mut client = reqwest::ClientBuilder::new();
69 :
70 0 : if let Some(jwt) = &conf.control_plane_api_token {
71 0 : let mut headers = reqwest::header::HeaderMap::new();
72 0 : headers.insert(
73 0 : "Authorization",
74 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
75 0 : );
76 0 : client = client.default_headers(headers);
77 0 : }
78 :
79 0 : for ssl_ca_cert in &conf.ssl_ca_certs {
80 0 : client = client.add_root_certificate(ssl_ca_cert.clone());
81 0 : }
82 :
83 : Ok(Some(Self {
84 0 : http_client: client.build()?,
85 0 : base_url: url,
86 0 : node_id: conf.id,
87 0 : cancel: cancel.clone(),
88 : }))
89 0 : }
90 :
91 : #[tracing::instrument(skip_all)]
92 : async fn retry_http_forever<R, T>(
93 : &self,
94 : url: &url::Url,
95 : request: R,
96 : ) -> Result<T, RetryForeverError>
97 : where
98 : R: Serialize,
99 : T: DeserializeOwned,
100 : {
101 : let res = backoff::retry(
102 0 : || async {
103 0 : let response = self
104 0 : .http_client
105 0 : .post(url.clone())
106 0 : .json(&request)
107 0 : .send()
108 0 : .await?;
109 :
110 0 : response.error_for_status_ref()?;
111 0 : response.json::<T>().await
112 0 : },
113 0 : |_| false,
114 : 3,
115 : u32::MAX,
116 : "storage controller upcall",
117 : &self.cancel,
118 : )
119 : .await
120 : .ok_or(RetryForeverError::ShuttingDown)?
121 : .expect("We retry forever, this should never be reached");
122 :
123 : Ok(res)
124 : }
125 :
126 0 : pub(crate) fn base_url(&self) -> &Url {
127 0 : &self.base_url
128 0 : }
129 : }
130 :
131 : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
132 : /// Block until we get a successful response, or error out if we are shut down
133 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
134 : async fn re_attach(
135 : &self,
136 : conf: &PageServerConf,
137 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
138 : let url = self
139 : .base_url
140 : .join("re-attach")
141 : .expect("Failed to build re-attach path");
142 :
143 : // Include registration content in the re-attach request if a metadata file is readable
144 : let metadata_path = conf.metadata_path();
145 : let register = match tokio::fs::read_to_string(&metadata_path).await {
146 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
147 : Ok(m) => {
148 : // Since we run one time at startup, be generous in our logging and
149 : // dump all metadata.
150 : tracing::info!(
151 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
152 : m.postgres_host,
153 : m.postgres_port,
154 : m.http_host,
155 : m.http_port,
156 : m.other
157 : );
158 :
159 : let az_id = {
160 : let az_id_from_metadata = m
161 : .other
162 : .get("availability_zone_id")
163 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
164 :
165 : match az_id_from_metadata {
166 : Some(az_id) => Some(AvailabilityZone(az_id)),
167 : None => {
168 : tracing::warn!(
169 : "metadata.json does not contain an 'availability_zone_id' field"
170 : );
171 : conf.availability_zone.clone().map(AvailabilityZone)
172 : }
173 : }
174 : };
175 :
176 : if az_id.is_none() {
177 : panic!(
178 : "Availablity zone id could not be inferred from metadata.json or pageserver config"
179 : );
180 : }
181 :
182 : Some(NodeRegisterRequest {
183 : node_id: conf.id,
184 : listen_pg_addr: m.postgres_host,
185 : listen_pg_port: m.postgres_port,
186 : listen_http_addr: m.http_host,
187 : listen_http_port: m.http_port,
188 : listen_https_port: m.https_port,
189 : availability_zone_id: az_id.expect("Checked above"),
190 : })
191 : }
192 : Err(e) => {
193 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
194 : None
195 : }
196 : },
197 : Err(e) => {
198 : if e.kind() == std::io::ErrorKind::NotFound {
199 : // This is legal: we may have been deployed with some external script
200 : // doing registration for us.
201 : tracing::info!("Metadata file not found at {metadata_path}");
202 : } else {
203 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
204 : }
205 : None
206 : }
207 : };
208 :
209 : let request = ReAttachRequest {
210 : node_id: self.node_id,
211 : register: register.clone(),
212 : };
213 :
214 : let response: ReAttachResponse = self.retry_http_forever(&url, request).await?;
215 : tracing::info!(
216 : "Received re-attach response with {} tenants (node {}, register: {:?})",
217 : response.tenants.len(),
218 : self.node_id,
219 : register,
220 : );
221 :
222 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
223 :
224 : Ok(response
225 : .tenants
226 : .into_iter()
227 0 : .map(|rart| (rart.id, rart))
228 : .collect::<HashMap<_, _>>())
229 : }
230 :
231 : /// Block until we get a successful response, or error out if we are shut down
232 : #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
233 : async fn validate(
234 : &self,
235 : tenants: Vec<(TenantShardId, Generation)>,
236 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
237 : let url = self
238 : .base_url
239 : .join("validate")
240 : .expect("Failed to build validate path");
241 :
242 : // When sending validate requests, break them up into chunks so that we
243 : // avoid possible edge cases of generating any HTTP requests that
244 : // require database I/O across many thousands of tenants.
245 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
246 : for tenant_chunk in (tenants).chunks(128) {
247 : let request = ValidateRequest {
248 : tenants: tenant_chunk
249 : .iter()
250 0 : .map(|(id, generation)| ValidateRequestTenant {
251 0 : id: *id,
252 0 : r#gen: (*generation).into().expect(
253 0 : "Generation should always be valid for a Tenant doing deletions",
254 0 : ),
255 0 : })
256 : .collect(),
257 : };
258 :
259 : failpoint_support::sleep_millis_async!(
260 : "control-plane-client-validate-sleep",
261 : &self.cancel
262 : );
263 : if self.cancel.is_cancelled() {
264 : return Err(RetryForeverError::ShuttingDown);
265 : }
266 :
267 : let response: ValidateResponse = self.retry_http_forever(&url, request).await?;
268 : for rt in response.tenants {
269 : result.insert(rt.id, rt.valid);
270 : }
271 : }
272 :
273 : Ok(result.into_iter().collect())
274 : }
275 : }
|