LCOV - code coverage report
Current view: top level - pageserver/src - controller_upcall_client.rs (source / functions) Coverage Total Hit
Test: c28d23d327d4ca6acc894004f1432d7b7eea829c.info Lines: 0.0 % 51 0
Test Date: 2025-03-21 14:50:36 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use futures::Future;
       4              : use pageserver_api::config::NodeMetadata;
       5              : use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
       6              : use pageserver_api::shard::TenantShardId;
       7              : use pageserver_api::upcall_api::{
       8              :     ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
       9              :     ValidateRequestTenant, ValidateResponse,
      10              : };
      11              : use serde::Serialize;
      12              : use serde::de::DeserializeOwned;
      13              : use tokio_util::sync::CancellationToken;
      14              : use url::Url;
      15              : use utils::generation::Generation;
      16              : use utils::id::NodeId;
      17              : use utils::{backoff, failpoint_support};
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::virtual_file::on_fatal_io_error;
      21              : 
      22              : /// The Pageserver's client for using the storage controller upcall API: this is a small API
      23              : /// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
      24              : pub struct StorageControllerUpcallClient {
      25              :     http_client: reqwest::Client,
      26              :     base_url: Url,
      27              :     node_id: NodeId,
      28              :     cancel: CancellationToken,
      29              : }
      30              : 
      31              : /// Represent operations which internally retry on all errors other than
      32              : /// cancellation token firing: the only way they can fail is ShuttingDown.
      33              : pub enum RetryForeverError {
      34              :     ShuttingDown,
      35              : }
      36              : 
      37              : pub trait StorageControllerUpcallApi {
      38              :     fn re_attach(
      39              :         &self,
      40              :         conf: &PageServerConf,
      41              :     ) -> impl Future<
      42              :         Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
      43              :     > + Send;
      44              :     fn validate(
      45              :         &self,
      46              :         tenants: Vec<(TenantShardId, Generation)>,
      47              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
      48              : }
      49              : 
      50              : impl StorageControllerUpcallClient {
      51              :     /// A None return value indicates that the input `conf` object does not have control
      52              :     /// plane API enabled.
      53            0 :     pub fn new(
      54            0 :         conf: &'static PageServerConf,
      55            0 :         cancel: &CancellationToken,
      56            0 :     ) -> Result<Option<Self>, reqwest::Error> {
      57            0 :         let mut url = match conf.control_plane_api.as_ref() {
      58            0 :             Some(u) => u.clone(),
      59            0 :             None => return Ok(None),
      60              :         };
      61              : 
      62            0 :         if let Ok(mut segs) = url.path_segments_mut() {
      63            0 :             // This ensures that `url` ends with a slash if it doesn't already.
      64            0 :             // That way, we can subsequently use join() to safely attach extra path elements.
      65            0 :             segs.pop_if_empty().push("");
      66            0 :         }
      67              : 
      68            0 :         let mut client = reqwest::ClientBuilder::new();
      69              : 
      70            0 :         if let Some(jwt) = &conf.control_plane_api_token {
      71            0 :             let mut headers = reqwest::header::HeaderMap::new();
      72            0 :             headers.insert(
      73            0 :                 "Authorization",
      74            0 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      75            0 :             );
      76            0 :             client = client.default_headers(headers);
      77            0 :         }
      78              : 
      79            0 :         if let Some(ssl_ca_cert) = &conf.ssl_ca_cert {
      80            0 :             client = client.add_root_certificate(ssl_ca_cert.clone());
      81            0 :         }
      82              : 
      83              :         Ok(Some(Self {
      84            0 :             http_client: client.build()?,
      85            0 :             base_url: url,
      86            0 :             node_id: conf.id,
      87            0 :             cancel: cancel.clone(),
      88              :         }))
      89            0 :     }
      90              : 
      91              :     #[tracing::instrument(skip_all)]
      92              :     async fn retry_http_forever<R, T>(
      93              :         &self,
      94              :         url: &url::Url,
      95              :         request: R,
      96              :     ) -> Result<T, RetryForeverError>
      97              :     where
      98              :         R: Serialize,
      99              :         T: DeserializeOwned,
     100              :     {
     101              :         let res = backoff::retry(
     102            0 :             || async {
     103            0 :                 let response = self
     104            0 :                     .http_client
     105            0 :                     .post(url.clone())
     106            0 :                     .json(&request)
     107            0 :                     .send()
     108            0 :                     .await?;
     109              : 
     110            0 :                 response.error_for_status_ref()?;
     111            0 :                 response.json::<T>().await
     112            0 :             },
     113            0 :             |_| false,
     114              :             3,
     115              :             u32::MAX,
     116              :             "storage controller upcall",
     117              :             &self.cancel,
     118              :         )
     119              :         .await
     120              :         .ok_or(RetryForeverError::ShuttingDown)?
     121              :         .expect("We retry forever, this should never be reached");
     122              : 
     123              :         Ok(res)
     124              :     }
     125              : 
     126            0 :     pub(crate) fn base_url(&self) -> &Url {
     127            0 :         &self.base_url
     128            0 :     }
     129              : }
     130              : 
     131              : impl StorageControllerUpcallApi for StorageControllerUpcallClient {
     132              :     /// Block until we get a successful response, or error out if we are shut down
     133              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     134              :     async fn re_attach(
     135              :         &self,
     136              :         conf: &PageServerConf,
     137              :     ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     138              :         let url = self
     139              :             .base_url
     140              :             .join("re-attach")
     141              :             .expect("Failed to build re-attach path");
     142              : 
     143              :         // Include registration content in the re-attach request if a metadata file is readable
     144              :         let metadata_path = conf.metadata_path();
     145              :         let register = match tokio::fs::read_to_string(&metadata_path).await {
     146              :             Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     147              :                 Ok(m) => {
     148              :                     // Since we run one time at startup, be generous in our logging and
     149              :                     // dump all metadata.
     150              :                     tracing::info!(
     151              :                         "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
     152              :                         m.postgres_host,
     153              :                         m.postgres_port,
     154              :                         m.http_host,
     155              :                         m.http_port,
     156              :                         m.other
     157              :                     );
     158              : 
     159              :                     let az_id = {
     160              :                         let az_id_from_metadata = m
     161              :                             .other
     162              :                             .get("availability_zone_id")
     163            0 :                             .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
     164              : 
     165              :                         match az_id_from_metadata {
     166              :                             Some(az_id) => Some(AvailabilityZone(az_id)),
     167              :                             None => {
     168              :                                 tracing::warn!(
     169              :                                     "metadata.json does not contain an 'availability_zone_id' field"
     170              :                                 );
     171              :                                 conf.availability_zone.clone().map(AvailabilityZone)
     172              :                             }
     173              :                         }
     174              :                     };
     175              : 
     176              :                     if az_id.is_none() {
     177              :                         panic!(
     178              :                             "Availablity zone id could not be inferred from metadata.json or pageserver config"
     179              :                         );
     180              :                     }
     181              : 
     182              :                     Some(NodeRegisterRequest {
     183              :                         node_id: conf.id,
     184              :                         listen_pg_addr: m.postgres_host,
     185              :                         listen_pg_port: m.postgres_port,
     186              :                         listen_http_addr: m.http_host,
     187              :                         listen_http_port: m.http_port,
     188              :                         listen_https_port: m.https_port,
     189              :                         availability_zone_id: az_id.expect("Checked above"),
     190              :                     })
     191              :                 }
     192              :                 Err(e) => {
     193              :                     tracing::error!("Unreadable metadata in {metadata_path}: {e}");
     194              :                     None
     195              :                 }
     196              :             },
     197              :             Err(e) => {
     198              :                 if e.kind() == std::io::ErrorKind::NotFound {
     199              :                     // This is legal: we may have been deployed with some external script
     200              :                     // doing registration for us.
     201              :                     tracing::info!("Metadata file not found at {metadata_path}");
     202              :                 } else {
     203              :                     on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
     204              :                 }
     205              :                 None
     206              :             }
     207              :         };
     208              : 
     209              :         let request = ReAttachRequest {
     210              :             node_id: self.node_id,
     211              :             register: register.clone(),
     212              :         };
     213              : 
     214              :         let response: ReAttachResponse = self.retry_http_forever(&url, request).await?;
     215              :         tracing::info!(
     216              :             "Received re-attach response with {} tenants (node {}, register: {:?})",
     217              :             response.tenants.len(),
     218              :             self.node_id,
     219              :             register,
     220              :         );
     221              : 
     222              :         failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
     223              : 
     224              :         Ok(response
     225              :             .tenants
     226              :             .into_iter()
     227            0 :             .map(|rart| (rart.id, rart))
     228              :             .collect::<HashMap<_, _>>())
     229              :     }
     230              : 
     231              :     /// Block until we get a successful response, or error out if we are shut down
     232              :     #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
     233              :     async fn validate(
     234              :         &self,
     235              :         tenants: Vec<(TenantShardId, Generation)>,
     236              :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     237              :         let url = self
     238              :             .base_url
     239              :             .join("validate")
     240              :             .expect("Failed to build validate path");
     241              : 
     242              :         // When sending validate requests, break them up into chunks so that we
     243              :         // avoid possible edge cases of generating any HTTP requests that
     244              :         // require database I/O across many thousands of tenants.
     245              :         let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
     246              :         for tenant_chunk in (tenants).chunks(128) {
     247              :             let request = ValidateRequest {
     248              :                 tenants: tenant_chunk
     249              :                     .iter()
     250            0 :                     .map(|(id, generation)| ValidateRequestTenant {
     251            0 :                         id: *id,
     252            0 :                         r#gen: (*generation).into().expect(
     253            0 :                             "Generation should always be valid for a Tenant doing deletions",
     254            0 :                         ),
     255            0 :                     })
     256              :                     .collect(),
     257              :             };
     258              : 
     259              :             failpoint_support::sleep_millis_async!(
     260              :                 "control-plane-client-validate-sleep",
     261              :                 &self.cancel
     262              :             );
     263              :             if self.cancel.is_cancelled() {
     264              :                 return Err(RetryForeverError::ShuttingDown);
     265              :             }
     266              : 
     267              :             let response: ValidateResponse = self.retry_http_forever(&url, request).await?;
     268              :             for rt in response.tenants {
     269              :                 result.insert(rt.id, rt.valid);
     270              :             }
     271              :         }
     272              : 
     273              :         Ok(result.into_iter().collect())
     274              :     }
     275              : }
        

Generated by: LCOV version 2.1-beta