LCOV - code coverage report
Current view: top level - pageserver/src - control_plane_client.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 0.0 % 152 0
Test Date: 2024-09-23 21:23:07 Functions: 0.0 % 25 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use futures::Future;
       4              : use pageserver_api::{
       5              :     controller_api::NodeRegisterRequest,
       6              :     shard::TenantShardId,
       7              :     upcall_api::{
       8              :         ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
       9              :         ValidateRequestTenant, ValidateResponse,
      10              :     },
      11              : };
      12              : use serde::{de::DeserializeOwned, Serialize};
      13              : use tokio_util::sync::CancellationToken;
      14              : use url::Url;
      15              : use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
      16              : 
      17              : use crate::{config::PageServerConf, virtual_file::on_fatal_io_error};
      18              : use pageserver_api::config::NodeMetadata;
      19              : 
      20              : /// The Pageserver's client for using the control plane API: this is a small subset
      21              : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
      22              : pub struct ControlPlaneClient {
      23              :     http_client: reqwest::Client,
      24              :     base_url: Url,
      25              :     node_id: NodeId,
      26              :     cancel: CancellationToken,
      27              : }
      28              : 
      29              : /// Represent operations which internally retry on all errors other than
      30              : /// cancellation token firing: the only way they can fail is ShuttingDown.
      31              : pub enum RetryForeverError {
      32              :     ShuttingDown,
      33              : }
      34              : 
      35              : pub trait ControlPlaneGenerationsApi {
      36              :     fn re_attach(
      37              :         &self,
      38              :         conf: &PageServerConf,
      39              :     ) -> impl Future<
      40              :         Output = Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError>,
      41              :     > + Send;
      42              :     fn validate(
      43              :         &self,
      44              :         tenants: Vec<(TenantShardId, Generation)>,
      45              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
      46              : }
      47              : 
      48              : impl ControlPlaneClient {
      49              :     /// A None return value indicates that the input `conf` object does not have control
      50              :     /// plane API enabled.
      51            0 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
      52            0 :         let mut url = match conf.control_plane_api.as_ref() {
      53            0 :             Some(u) => u.clone(),
      54            0 :             None => return None,
      55              :         };
      56              : 
      57            0 :         if let Ok(mut segs) = url.path_segments_mut() {
      58            0 :             // This ensures that `url` ends with a slash if it doesn't already.
      59            0 :             // That way, we can subsequently use join() to safely attach extra path elements.
      60            0 :             segs.pop_if_empty().push("");
      61            0 :         }
      62              : 
      63            0 :         let mut client = reqwest::ClientBuilder::new();
      64              : 
      65            0 :         if let Some(jwt) = &conf.control_plane_api_token {
      66            0 :             let mut headers = reqwest::header::HeaderMap::new();
      67            0 :             headers.insert(
      68            0 :                 "Authorization",
      69            0 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      70            0 :             );
      71            0 :             client = client.default_headers(headers);
      72            0 :         }
      73              : 
      74            0 :         Some(Self {
      75            0 :             http_client: client.build().expect("Failed to construct HTTP client"),
      76            0 :             base_url: url,
      77            0 :             node_id: conf.id,
      78            0 :             cancel: cancel.clone(),
      79            0 :         })
      80            0 :     }
      81              : 
      82            0 :     async fn retry_http_forever<R, T>(
      83            0 :         &self,
      84            0 :         url: &url::Url,
      85            0 :         request: R,
      86            0 :     ) -> Result<T, RetryForeverError>
      87            0 :     where
      88            0 :         R: Serialize,
      89            0 :         T: DeserializeOwned,
      90            0 :     {
      91            0 :         let res = backoff::retry(
      92            0 :             || async {
      93            0 :                 let response = self
      94            0 :                     .http_client
      95            0 :                     .post(url.clone())
      96            0 :                     .json(&request)
      97            0 :                     .send()
      98            0 :                     .await?;
      99              : 
     100            0 :                 response.error_for_status_ref()?;
     101            0 :                 response.json::<T>().await
     102            0 :             },
     103            0 :             |_| false,
     104            0 :             3,
     105            0 :             u32::MAX,
     106            0 :             "calling control plane generation validation API",
     107            0 :             &self.cancel,
     108            0 :         )
     109            0 :         .await
     110            0 :         .ok_or(RetryForeverError::ShuttingDown)?
     111            0 :         .expect("We retry forever, this should never be reached");
     112            0 : 
     113            0 :         Ok(res)
     114            0 :     }
     115              : }
     116              : 
     117              : impl ControlPlaneGenerationsApi for ControlPlaneClient {
     118              :     /// Block until we get a successful response, or error out if we are shut down
     119            0 :     async fn re_attach(
     120            0 :         &self,
     121            0 :         conf: &PageServerConf,
     122            0 :     ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     123            0 :         let re_attach_path = self
     124            0 :             .base_url
     125            0 :             .join("re-attach")
     126            0 :             .expect("Failed to build re-attach path");
     127            0 : 
     128            0 :         // Include registration content in the re-attach request if a metadata file is readable
     129            0 :         let metadata_path = conf.metadata_path();
     130            0 :         let register = match tokio::fs::read_to_string(&metadata_path).await {
     131            0 :             Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     132            0 :                 Ok(m) => {
     133            0 :                     // Since we run one time at startup, be generous in our logging and
     134            0 :                     // dump all metadata.
     135            0 :                     tracing::info!(
     136            0 :                         "Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
     137              :                         m.postgres_host,
     138              :                         m.postgres_port,
     139              :                         m.http_host,
     140              :                         m.http_port,
     141              :                         m.other
     142              :                     );
     143              : 
     144            0 :                     let az_id = {
     145            0 :                         let az_id_from_metadata = m
     146            0 :                             .other
     147            0 :                             .get("availability_zone_id")
     148            0 :                             .and_then(|jv| jv.as_str().map(|str| str.to_owned()));
     149            0 : 
     150            0 :                         match az_id_from_metadata {
     151            0 :                             Some(az_id) => Some(az_id),
     152              :                             None => {
     153            0 :                                 tracing::warn!("metadata.json does not contain an 'availability_zone_id' field");
     154            0 :                                 conf.availability_zone.clone()
     155              :                             }
     156              :                         }
     157              :                     };
     158              : 
     159            0 :                     if az_id.is_none() {
     160            0 :                         panic!("Availablity zone id could not be inferred from metadata.json or pageserver config");
     161            0 :                     }
     162            0 : 
     163            0 :                     Some(NodeRegisterRequest {
     164            0 :                         node_id: conf.id,
     165            0 :                         listen_pg_addr: m.postgres_host,
     166            0 :                         listen_pg_port: m.postgres_port,
     167            0 :                         listen_http_addr: m.http_host,
     168            0 :                         listen_http_port: m.http_port,
     169            0 :                         availability_zone_id: az_id.expect("Checked above"),
     170            0 :                     })
     171              :                 }
     172            0 :                 Err(e) => {
     173            0 :                     tracing::error!("Unreadable metadata in {metadata_path}: {e}");
     174            0 :                     None
     175              :                 }
     176              :             },
     177            0 :             Err(e) => {
     178            0 :                 if e.kind() == std::io::ErrorKind::NotFound {
     179              :                     // This is legal: we may have been deployed with some external script
     180              :                     // doing registration for us.
     181            0 :                     tracing::info!("Metadata file not found at {metadata_path}");
     182              :                 } else {
     183            0 :                     on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
     184              :                 }
     185            0 :                 None
     186              :             }
     187              :         };
     188              : 
     189            0 :         let request = ReAttachRequest {
     190            0 :             node_id: self.node_id,
     191            0 :             register,
     192            0 :         };
     193              : 
     194            0 :         let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
     195            0 :         tracing::info!(
     196            0 :             "Received re-attach response with {} tenants",
     197            0 :             response.tenants.len()
     198              :         );
     199              : 
     200            0 :         failpoint_support::sleep_millis_async!("control-plane-client-re-attach");
     201              : 
     202            0 :         Ok(response
     203            0 :             .tenants
     204            0 :             .into_iter()
     205            0 :             .map(|rart| (rart.id, rart))
     206            0 :             .collect::<HashMap<_, _>>())
     207            0 :     }
     208              : 
     209              :     /// Block until we get a successful response, or error out if we are shut down
     210            0 :     async fn validate(
     211            0 :         &self,
     212            0 :         tenants: Vec<(TenantShardId, Generation)>,
     213            0 :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     214            0 :         let re_attach_path = self
     215            0 :             .base_url
     216            0 :             .join("validate")
     217            0 :             .expect("Failed to build validate path");
     218            0 : 
     219            0 :         let request = ValidateRequest {
     220            0 :             tenants: tenants
     221            0 :                 .into_iter()
     222            0 :                 .map(|(id, gen)| ValidateRequestTenant {
     223            0 :                     id,
     224            0 :                     gen: gen
     225            0 :                         .into()
     226            0 :                         .expect("Generation should always be valid for a Tenant doing deletions"),
     227            0 :                 })
     228            0 :                 .collect(),
     229            0 :         };
     230            0 : 
     231            0 :         failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
     232            0 :         if self.cancel.is_cancelled() {
     233            0 :             return Err(RetryForeverError::ShuttingDown);
     234            0 :         }
     235              : 
     236            0 :         let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
     237              : 
     238            0 :         Ok(response
     239            0 :             .tenants
     240            0 :             .into_iter()
     241            0 :             .map(|rt| (rt.id, rt.valid))
     242            0 :             .collect())
     243            0 :     }
     244              : }
        

Generated by: LCOV version 2.1-beta