LCOV - code coverage report
Current view: top level - pageserver/src - control_plane_client.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 100.0 % 107 107
Test Date: 2024-02-07 07:37:29 Functions: 69.6 % 23 16

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use futures::Future;
       4              : use pageserver_api::{
       5              :     control_api::{
       6              :         ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
       7              :     },
       8              :     shard::TenantShardId,
       9              : };
      10              : use serde::{de::DeserializeOwned, Serialize};
      11              : use tokio_util::sync::CancellationToken;
      12              : use url::Url;
      13              : use utils::{backoff, generation::Generation, id::NodeId};
      14              : 
      15              : use crate::config::PageServerConf;
      16              : 
      17              : /// The Pageserver's client for using the control plane API: this is a small subset
      18              : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
      19              : pub struct ControlPlaneClient {
      20              :     http_client: reqwest::Client,
      21              :     base_url: Url,
      22              :     node_id: NodeId,
      23              :     cancel: CancellationToken,
      24              : }
      25              : 
      26              : /// Represent operations which internally retry on all errors other than
      27              : /// cancellation token firing: the only way they can fail is ShuttingDown.
      28              : pub enum RetryForeverError {
      29              :     ShuttingDown,
      30              : }
      31              : 
      32              : pub trait ControlPlaneGenerationsApi {
      33              :     fn re_attach(
      34              :         &self,
      35              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
      36              :     fn validate(
      37              :         &self,
      38              :         tenants: Vec<(TenantShardId, Generation)>,
      39              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
      40              : }
      41              : 
      42              : impl ControlPlaneClient {
      43              :     /// A None return value indicates that the input `conf` object does not have control
      44              :     /// plane API enabled.
      45         1207 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
      46         1207 :         let mut url = match conf.control_plane_api.as_ref() {
      47         1205 :             Some(u) => u.clone(),
      48            2 :             None => return None,
      49              :         };
      50              : 
      51         1205 :         if let Ok(mut segs) = url.path_segments_mut() {
      52         1205 :             // This ensures that `url` ends with a slash if it doesn't already.
      53         1205 :             // That way, we can subsequently use join() to safely attach extra path elements.
      54         1205 :             segs.pop_if_empty().push("");
      55         1205 :         }
      56              : 
      57         1205 :         let mut client = reqwest::ClientBuilder::new();
      58              : 
      59         1205 :         if let Some(jwt) = &conf.control_plane_api_token {
      60           22 :             let mut headers = hyper::HeaderMap::new();
      61           22 :             headers.insert(
      62           22 :                 "Authorization",
      63           22 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      64           22 :             );
      65           22 :             client = client.default_headers(headers);
      66         1183 :         }
      67              : 
      68         1205 :         Some(Self {
      69         1205 :             http_client: client.build().expect("Failed to construct HTTP client"),
      70         1205 :             base_url: url,
      71         1205 :             node_id: conf.id,
      72         1205 :             cancel: cancel.clone(),
      73         1205 :         })
      74         1207 :     }
      75              : 
      76         1013 :     async fn retry_http_forever<R, T>(
      77         1013 :         &self,
      78         1013 :         url: &url::Url,
      79         1013 :         request: R,
      80         1013 :     ) -> Result<T, RetryForeverError>
      81         1013 :     where
      82         1013 :         R: Serialize,
      83         1013 :         T: DeserializeOwned,
      84         1013 :     {
      85         1013 :         let res = backoff::retry(
      86         1022 :             || async {
      87         1022 :                 let response = self
      88         1022 :                     .http_client
      89         1022 :                     .post(url.clone())
      90         1022 :                     .json(&request)
      91         1022 :                     .send()
      92         2700 :                     .await?;
      93              : 
      94         1012 :                 response.error_for_status_ref()?;
      95         1012 :                 response.json::<T>().await
      96         1022 :             },
      97         1013 :             |_| false,
      98         1013 :             3,
      99         1013 :             u32::MAX,
     100         1013 :             "calling control plane generation validation API",
     101         1013 :             &self.cancel,
     102         1013 :         )
     103         2772 :         .await
     104         1013 :         .ok_or(RetryForeverError::ShuttingDown)?
     105         1012 :         .expect("We retry forever, this should never be reached");
     106         1012 : 
     107         1012 :         Ok(res)
     108         1013 :     }
     109              : }
     110              : 
     111              : impl ControlPlaneGenerationsApi for ControlPlaneClient {
     112              :     /// Block until we get a successful response, or error out if we are shut down
     113          602 :     async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
     114          602 :         let re_attach_path = self
     115          602 :             .base_url
     116          602 :             .join("re-attach")
     117          602 :             .expect("Failed to build re-attach path");
     118          602 :         let request = ReAttachRequest {
     119          602 :             node_id: self.node_id,
     120          602 :         };
     121          602 : 
     122          602 :         fail::fail_point!("control-plane-client-re-attach");
     123              : 
     124         1826 :         let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
     125          602 :         tracing::info!(
     126          602 :             "Received re-attach response with {} tenants",
     127          602 :             response.tenants.len()
     128          602 :         );
     129              : 
     130          602 :         Ok(response
     131          602 :             .tenants
     132          602 :             .into_iter()
     133          602 :             .map(|t| (t.id, Generation::new(t.gen)))
     134          602 :             .collect::<HashMap<_, _>>())
     135          602 :     }
     136              : 
     137              :     /// Block until we get a successful response, or error out if we are shut down
     138          411 :     async fn validate(
     139          411 :         &self,
     140          411 :         tenants: Vec<(TenantShardId, Generation)>,
     141          411 :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     142          411 :         let re_attach_path = self
     143          411 :             .base_url
     144          411 :             .join("validate")
     145          411 :             .expect("Failed to build validate path");
     146          411 : 
     147          411 :         let request = ValidateRequest {
     148          411 :             tenants: tenants
     149          411 :                 .into_iter()
     150          498 :                 .map(|(id, gen)| ValidateRequestTenant {
     151          498 :                     id,
     152          498 :                     gen: gen
     153          498 :                         .into()
     154          498 :                         .expect("Generation should always be valid for a Tenant doing deletions"),
     155          498 :                 })
     156          411 :                 .collect(),
     157          411 :         };
     158          411 : 
     159          411 :         fail::fail_point!("control-plane-client-validate");
     160              : 
     161          946 :         let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
     162              : 
     163          410 :         Ok(response
     164          410 :             .tenants
     165          410 :             .into_iter()
     166          497 :             .map(|rt| (rt.id, rt.valid))
     167          410 :             .collect())
     168          411 :     }
     169              : }
        

Generated by: LCOV version 2.1-beta