Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use futures::Future;
4 : use pageserver_api::{
5 : controller_api::{AvailabilityZone, NodeRegisterRequest},
6 : shard::TenantShardId,
7 : upcall_api::{
8 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
9 : ValidateRequestTenant, ValidateResponse,
10 : },
11 : };
12 : use serde::{de::DeserializeOwned, Serialize};
13 : use tokio_util::sync::CancellationToken;
14 : use url::Url;
15 : use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
16 :
17 : use crate::{config::PageServerConf, virtual_file::on_fatal_io_error};
18 : use pageserver_api::config::NodeMetadata;
19 :
20 : /// The Pageserver's client for using the storage controller upcall API: this is a small API
21 : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
22 : ///
23 : /// The server presenting this API may either be the storage controller or some other
24 : /// service (such as the Neon control plane) providing a store of generation numbers.
25 : pub struct ControllerUpcallClient {
26 : http_client: reqwest::Client,
27 : base_url: Url,
28 : node_id: NodeId,
29 : cancel: CancellationToken,
30 : }
31 :
32 : /// Represent operations which internally retry on all errors other than
33 : /// cancellation token firing: the only way they can fail is ShuttingDown.
34 : pub enum RetryForeverError {
35 : ShuttingDown,
36 : }
37 :
38 : pub trait ControlPlaneGenerationsApi {
39 : fn re_attach(
40 : &self,
41 : conf: &PageServerConf,
42 : ) -> impl Future<
43 : Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
44 : > + Send;
45 : fn validate(
46 : &self,
47 : tenants: Vec<(TenantShardId, Generation)>,
48 : ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
49 : }
50 :
51 : impl ControllerUpcallClient {
52 : /// A None return value indicates that the input `conf` object does not have control
53 : /// plane API enabled.
54 0 : pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
55 0 : let mut url = match conf.control_plane_api.as_ref() {
56 0 : Some(u) => u.clone(),
57 0 : None => return None,
58 : };
59 :
60 0 : if let Ok(mut segs) = url.path_segments_mut() {
61 0 : // This ensures that `url` ends with a slash if it doesn't already.
62 0 : // That way, we can subsequently use join() to safely attach extra path elements.
63 0 : segs.pop_if_empty().push("");
64 0 : }
65 :
66 0 : let mut client = reqwest::ClientBuilder::new();
67 :
68 0 : if let Some(jwt) = &conf.control_plane_api_token {
69 0 : let mut headers = reqwest::header::HeaderMap::new();
70 0 : headers.insert(
71 0 : "Authorization",
72 0 : format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
73 0 : );
74 0 : client = client.default_headers(headers);
75 0 : }
76 :
77 0 : Some(Self {
78 0 : http_client: client.build().expect("Failed to construct HTTP client"),
79 0 : base_url: url,
80 0 : node_id: conf.id,
81 0 : cancel: cancel.clone(),
82 0 : })
83 0 : }
84 :
85 0 : async fn retry_http_forever<R, T>(
86 0 : &self,
87 0 : url: &url::Url,
88 0 : request: R,
89 0 : ) -> Result<T, RetryForeverError>
90 0 : where
91 0 : R: Serialize,
92 0 : T: DeserializeOwned,
93 0 : {
94 0 : let res = backoff::retry(
95 0 : || async {
96 0 : let response = self
97 0 : .http_client
98 0 : .post(url.clone())
99 0 : .json(&request)
100 0 : .send()
101 0 : .await?;
102 :
103 0 : response.error_for_status_ref()?;
104 0 : response.json::<T>().await
105 0 : },
106 0 : |_| false,
107 0 : 3,
108 0 : u32::MAX,
109 0 : "calling control plane generation validation API",
110 0 : &self.cancel,
111 0 : )
112 0 : .await
113 0 : .ok_or(RetryForeverError::ShuttingDown)?
114 0 : .expect("We retry forever, this should never be reached");
115 0 :
116 0 : Ok(res)
117 0 : }
118 :
119 0 : pub(crate) fn base_url(&self) -> &Url {
120 0 : &self.base_url
121 0 : }
122 : }
123 :
124 : impl ControlPlaneGenerationsApi for ControllerUpcallClient {
125 : /// Block until we get a successful response, or error out if we are shut down
126 0 : async fn re_attach(
127 0 : &self,
128 0 : conf: &PageServerConf,
129 0 : ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
130 0 : let re_attach_path = self
131 0 : .base_url
132 0 : .join("re-attach")
133 0 : .expect("Failed to build re-attach path");
134 0 :
135 0 : // Include registration content in the re-attach request if a metadata file is readable
136 0 : let metadata_path = conf.metadata_path();
137 0 : let register = match tokio::fs::read_to_string(&metadata_path).await {
138 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
139 0 : Ok(m) => {
140 0 : // Since we run one time at startup, be generous in our logging and
141 0 : // dump all metadata.
142 0 : tracing::info!(
143 0 : "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
144 : m.postgres_host,
145 : m.postgres_port,
146 : m.http_host,
147 : m.http_port,
148 : m.other
149 : );
150 :
151 0 : let az_id = {
152 0 : let az_id_from_metadata = m
153 0 : .other
154 0 : .get("availability_zone_id")
155 0 : .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
156 0 :
157 0 : match az_id_from_metadata {
158 0 : Some(az_id) => Some(AvailabilityZone(az_id)),
159 : None => {
160 0 : tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
161 0 : conf.availability_zone.clone().map(AvailabilityZone)
162 : }
163 : }
164 : };
165 :
166 0 : if az_id.is_none() {
167 0 : panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
168 0 : }
169 0 :
170 0 : Some(NodeRegisterRequest {
171 0 : node_id: conf.id,
172 0 : listen_pg_addr: m.postgres_host,
173 0 : listen_pg_port: m.postgres_port,
174 0 : listen_http_addr: m.http_host,
175 0 : listen_http_port: m.http_port,
176 0 : availability_zone_id: az_id.expect("Checked above"),
177 0 : })
178 : }
179 0 : Err(e) => {
180 0 : tracing::error!("Unreadable metadata in {metadata_path}: {e}");
181 0 : None
182 : }
183 : },
184 0 : Err(e) => {
185 0 : if e.kind() == std::io::ErrorKind::NotFound {
186 : // This is legal: we may have been deployed with some external script
187 : // doing registration for us.
188 0 : tracing::info!("Metadata file not found at {metadata_path}");
189 : } else {
190 0 : on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
191 : }
192 0 : None
193 : }
194 : };
195 :
196 0 : let request = ReAttachRequest {
197 0 : node_id: self.node_id,
198 0 : register: register.clone(),
199 0 : };
200 :
201 0 : let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
202 0 : tracing::info!(
203 0 : "Received re-attach response with {} tenants (node {}, register: {:?})",
204 0 : response.tenants.len(),
205 : self.node_id,
206 : register,
207 : );
208 :
209 0 : failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
210 :
211 0 : Ok(response
212 0 : .tenants
213 0 : .into_iter()
214 0 : .map(|rart| (rart.id, rart))
215 0 : .collect::<HashMap<_, _>>())
216 0 : }
217 :
218 : /// Block until we get a successful response, or error out if we are shut down
219 0 : async fn validate(
220 0 : &self,
221 0 : tenants: Vec<(TenantShardId, Generation)>,
222 0 : ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
223 0 : let re_attach_path = self
224 0 : .base_url
225 0 : .join("validate")
226 0 : .expect("Failed to build validate path");
227 0 :
228 0 : // When sending validate requests, break them up into chunks so that we
229 0 : // avoid possible edge cases of generating any HTTP requests that
230 0 : // require database I/O across many thousands of tenants.
231 0 : let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
232 0 : for tenant_chunk in (tenants).chunks(128) {
233 0 : let request = ValidateRequest {
234 0 : tenants: tenant_chunk
235 0 : .iter()
236 0 : .map(|(id, generation)| ValidateRequestTenant {
237 0 : id: *id,
238 0 : gen: (*generation).into().expect(
239 0 : "Generation should always be valid for a Tenant doing deletions",
240 0 : ),
241 0 : })
242 0 : .collect(),
243 0 : };
244 0 :
245 0 : failpoint_support::sleep_millis_async!(
246 : "control-plane-client-validate-sleep",
247 0 : &self.cancel
248 : );
249 0 : if self.cancel.is_cancelled() {
250 0 : return Err(RetryForeverError::ShuttingDown);
251 0 : }
252 :
253 0 : let response: ValidateResponse =
254 0 : self.retry_http_forever(&re_attach_path, request).await?;
255 0 : for rt in response.tenants {
256 0 : result.insert(rt.id, rt.valid);
257 0 : }
258 : }
259 :
260 0 : Ok(result.into_iter().collect())
261 0 : }
262 : }
|