LCOV - code coverage report
Current view: top level - pageserver/src - control_plane_client.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 98.1 % 105 103
Test Date: 2024-02-12 20:26:03 Functions: 69.2 % 26 18

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : 
       3              : use futures::Future;
       4              : use pageserver_api::{
       5              :     control_api::{
       6              :         ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
       7              :     },
       8              :     shard::TenantShardId,
       9              : };
      10              : use serde::{de::DeserializeOwned, Serialize};
      11              : use tokio_util::sync::CancellationToken;
      12              : use url::Url;
      13              : use utils::{backoff, generation::Generation, id::NodeId};
      14              : 
      15              : use crate::config::PageServerConf;
      16              : 
      17              : /// The Pageserver's client for using the control plane API: this is a small subset
      18              : /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
      19              : pub struct ControlPlaneClient {
      20              :     http_client: reqwest::Client,
      21              :     base_url: Url,
      22              :     node_id: NodeId,
      23              :     cancel: CancellationToken,
      24              : }
      25              : 
      26              : /// Represent operations which internally retry on all errors other than
      27              : /// cancellation token firing: the only way they can fail is ShuttingDown.
      28              : pub enum RetryForeverError {
      29              :     ShuttingDown,
      30              : }
      31              : 
      32              : pub trait ControlPlaneGenerationsApi {
      33              :     fn re_attach(
      34              :         &self,
      35              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
      36              :     fn validate(
      37              :         &self,
      38              :         tenants: Vec<(TenantShardId, Generation)>,
      39              :     ) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
      40              : }
      41              : 
      42              : impl ControlPlaneClient {
      43              :     /// A None return value indicates that the input `conf` object does not have control
      44              :     /// plane API enabled.
      45         1247 :     pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
      46         1247 :         let mut url = match conf.control_plane_api.as_ref() {
      47         1245 :             Some(u) => u.clone(),
      48            2 :             None => return None,
      49              :         };
      50              : 
      51         1245 :         if let Ok(mut segs) = url.path_segments_mut() {
      52         1245 :             // This ensures that `url` ends with a slash if it doesn't already.
      53         1245 :             // That way, we can subsequently use join() to safely attach extra path elements.
      54         1245 :             segs.pop_if_empty().push("");
      55         1245 :         }
      56              : 
      57         1245 :         let mut client = reqwest::ClientBuilder::new();
      58              : 
      59         1245 :         if let Some(jwt) = &conf.control_plane_api_token {
      60           22 :             let mut headers = hyper::HeaderMap::new();
      61           22 :             headers.insert(
      62           22 :                 "Authorization",
      63           22 :                 format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
      64           22 :             );
      65           22 :             client = client.default_headers(headers);
      66         1223 :         }
      67              : 
      68         1245 :         Some(Self {
      69         1245 :             http_client: client.build().expect("Failed to construct HTTP client"),
      70         1245 :             base_url: url,
      71         1245 :             node_id: conf.id,
      72         1245 :             cancel: cancel.clone(),
      73         1245 :         })
      74         1247 :     }
      75              : 
      76         1020 :     async fn retry_http_forever<R, T>(
      77         1020 :         &self,
      78         1020 :         url: &url::Url,
      79         1020 :         request: R,
      80         1020 :     ) -> Result<T, RetryForeverError>
      81         1020 :     where
      82         1020 :         R: Serialize,
      83         1020 :         T: DeserializeOwned,
      84         1020 :     {
      85         1020 :         let res = backoff::retry(
      86         1030 :             || async {
      87         1030 :                 let response = self
      88         1030 :                     .http_client
      89         1030 :                     .post(url.clone())
      90         1030 :                     .json(&request)
      91         1030 :                     .send()
      92         2764 :                     .await?;
      93              : 
      94         1019 :                 response.error_for_status_ref()?;
      95         1019 :                 response.json::<T>().await
      96         2060 :             },
      97         1020 :             |_| false,
      98         1020 :             3,
      99         1020 :             u32::MAX,
     100         1020 :             "calling control plane generation validation API",
     101         1020 :             &self.cancel,
     102         1020 :         )
     103         2872 :         .await
     104         1020 :         .ok_or(RetryForeverError::ShuttingDown)?
     105         1019 :         .expect("We retry forever, this should never be reached");
     106         1019 : 
     107         1019 :         Ok(res)
     108         1020 :     }
     109              : }
     110              : 
     111              : impl ControlPlaneGenerationsApi for ControlPlaneClient {
     112              :     /// Block until we get a successful response, or error out if we are shut down
     113          622 :     async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
     114          622 :         let re_attach_path = self
     115          622 :             .base_url
     116          622 :             .join("re-attach")
     117          622 :             .expect("Failed to build re-attach path");
     118          622 :         let request = ReAttachRequest {
     119          622 :             node_id: self.node_id,
     120          622 :         };
     121              : 
     122            0 :         fail::fail_point!("control-plane-client-re-attach");
     123              : 
     124         1914 :         let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
     125          622 :         tracing::info!(
     126          622 :             "Received re-attach response with {} tenants",
     127          622 :             response.tenants.len()
     128          622 :         );
     129              : 
     130          622 :         Ok(response
     131          622 :             .tenants
     132          622 :             .into_iter()
     133          622 :             .map(|t| (t.id, Generation::new(t.gen)))
     134          622 :             .collect::<HashMap<_, _>>())
     135          622 :     }
     136              : 
     137              :     /// Block until we get a successful response, or error out if we are shut down
     138          398 :     async fn validate(
     139          398 :         &self,
     140          398 :         tenants: Vec<(TenantShardId, Generation)>,
     141          398 :     ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     142          398 :         let re_attach_path = self
     143          398 :             .base_url
     144          398 :             .join("validate")
     145          398 :             .expect("Failed to build validate path");
     146          398 : 
     147          398 :         let request = ValidateRequest {
     148          398 :             tenants: tenants
     149          398 :                 .into_iter()
     150          480 :                 .map(|(id, gen)| ValidateRequestTenant {
     151          480 :                     id,
     152          480 :                     gen: gen
     153          480 :                         .into()
     154          480 :                         .expect("Generation should always be valid for a Tenant doing deletions"),
     155          480 :                 })
     156          398 :                 .collect(),
     157          398 :         };
     158              : 
     159            0 :         fail::fail_point!("control-plane-client-validate");
     160              : 
     161          958 :         let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
     162              : 
     163          397 :         Ok(response
     164          397 :             .tenants
     165          397 :             .into_iter()
     166          479 :             .map(|rt| (rt.id, rt.valid))
     167          397 :             .collect())
     168          398 :     }
     169              : }
        

Generated by: LCOV version 2.1-beta