LCOV - code coverage report
Current view: top level - storage_controller/src - safekeeper.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 148 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 32 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
       4              : use reqwest::{Certificate, StatusCode};
       5              : use safekeeper_client::mgmt_api;
       6              : use tokio_util::sync::CancellationToken;
       7              : use utils::backoff;
       8              : use utils::id::NodeId;
       9              : use utils::logging::SecretString;
      10              : 
      11              : use crate::heartbeater::SafekeeperState;
      12              : use crate::persistence::{DatabaseError, SafekeeperPersistence};
      13              : use crate::safekeeper_client::SafekeeperClient;
      14              : 
      15              : #[derive(Clone)]
      16              : pub struct Safekeeper {
      17              :     pub(crate) skp: SafekeeperPersistence,
      18              :     cancel: CancellationToken,
      19              :     listen_http_addr: String,
      20              :     listen_http_port: u16,
      21              :     listen_https_port: Option<u16>,
      22              :     scheduling_policy: SkSchedulingPolicy,
      23              :     id: NodeId,
      24              :     /// Heartbeating result.
      25              :     availability: SafekeeperState,
      26              : 
      27              :     // Flag from storcon's config to use https for safekeeper API.
      28              :     // Invariant: if |true|, listen_https_port should contain a value.
      29              :     use_https: bool,
      30              : }
      31              : 
      32              : impl Safekeeper {
      33            0 :     pub(crate) fn from_persistence(
      34            0 :         skp: SafekeeperPersistence,
      35            0 :         cancel: CancellationToken,
      36            0 :         use_https: bool,
      37            0 :     ) -> anyhow::Result<Self> {
      38            0 :         if use_https && skp.https_port.is_none() {
      39            0 :             anyhow::bail!(
      40            0 :                 "cannot load safekeeper {} from persistence: \
      41            0 :                 https is enabled, but https port is not specified",
      42            0 :                 skp.id,
      43            0 :             );
      44            0 :         }
      45            0 : 
      46            0 :         let scheduling_policy = skp.scheduling_policy.0;
      47            0 :         Ok(Self {
      48            0 :             cancel,
      49            0 :             listen_http_addr: skp.host.clone(),
      50            0 :             listen_http_port: skp.http_port as u16,
      51            0 :             listen_https_port: skp.https_port.map(|x| x as u16),
      52            0 :             id: NodeId(skp.id as u64),
      53            0 :             skp,
      54            0 :             availability: SafekeeperState::Offline,
      55            0 :             scheduling_policy,
      56            0 :             use_https,
      57            0 :         })
      58            0 :     }
      59              : 
      60            0 :     pub(crate) fn base_url(&self) -> String {
      61            0 :         if self.use_https {
      62            0 :             format!(
      63            0 :                 "https://{}:{}",
      64            0 :                 self.listen_http_addr,
      65            0 :                 self.listen_https_port
      66            0 :                     .expect("https port should be specified if use_https is on"),
      67            0 :             )
      68              :         } else {
      69            0 :             format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
      70              :         }
      71            0 :     }
      72              : 
      73            0 :     pub(crate) fn get_id(&self) -> NodeId {
      74            0 :         self.id
      75            0 :     }
      76            0 :     pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
      77            0 :         self.skp.as_describe_response()
      78            0 :     }
      79            0 :     pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
      80            0 :         self.availability = availability;
      81            0 :     }
      82            0 :     pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
      83            0 :         self.scheduling_policy
      84            0 :     }
      85            0 :     pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
      86            0 :         self.scheduling_policy = scheduling_policy;
      87            0 :         self.skp.scheduling_policy = scheduling_policy.into();
      88            0 :     }
      89            0 :     pub(crate) fn availability(&self) -> SafekeeperState {
      90            0 :         self.availability.clone()
      91            0 :     }
      92              :     /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
      93              :     #[allow(clippy::too_many_arguments)]
      94            0 :     pub(crate) async fn with_client_retries<T, O, F>(
      95            0 :         &self,
      96            0 :         mut op: O,
      97            0 :         jwt: &Option<SecretString>,
      98            0 :         ssl_ca_cert: &Option<Certificate>,
      99            0 :         warn_threshold: u32,
     100            0 :         max_retries: u32,
     101            0 :         timeout: Duration,
     102            0 :         cancel: &CancellationToken,
     103            0 :     ) -> mgmt_api::Result<T>
     104            0 :     where
     105            0 :         O: FnMut(SafekeeperClient) -> F,
     106            0 :         F: std::future::Future<Output = mgmt_api::Result<T>>,
     107            0 :     {
     108            0 :         fn is_fatal(e: &mgmt_api::Error) -> bool {
     109              :             use mgmt_api::Error::*;
     110            0 :             match e {
     111            0 :                 ReceiveBody(_) | ReceiveErrorBody(_) => false,
     112              :                 ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
     113              :                 | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
     114            0 :                 | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
     115            0 :                 ApiError(_, _) => true,
     116            0 :                 Cancelled => true,
     117            0 :                 CreateClient(_) => true,
     118              :             }
     119            0 :         }
     120              : 
     121              :         // TODO: refactor SafekeeperClient and with_client_retires (#11113).
     122            0 :         let mut http_client = reqwest::Client::builder().timeout(timeout);
     123            0 :         if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
     124            0 :             http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
     125            0 :         }
     126            0 :         let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
     127              : 
     128            0 :         backoff::retry(
     129            0 :             || {
     130            0 :                 let client = SafekeeperClient::new(
     131            0 :                     self.get_id(),
     132            0 :                     http_client.clone(),
     133            0 :                     self.base_url(),
     134            0 :                     jwt.clone(),
     135            0 :                 );
     136            0 : 
     137            0 :                 let node_cancel_fut = self.cancel.cancelled();
     138            0 : 
     139            0 :                 let op_fut = op(client);
     140              : 
     141            0 :                 async {
     142            0 :                     tokio::select! {
     143            0 :                         r = op_fut=> {r},
     144            0 :                         _ = node_cancel_fut => {
     145            0 :                         Err(mgmt_api::Error::Cancelled)
     146              :                     }}
     147            0 :                 }
     148            0 :             },
     149            0 :             is_fatal,
     150            0 :             warn_threshold,
     151            0 :             max_retries,
     152            0 :             &format!(
     153            0 :                 "Call to safekeeper {} ({}) management API",
     154            0 :                 self.id,
     155            0 :                 self.base_url(),
     156            0 :             ),
     157            0 :             cancel,
     158            0 :         )
     159            0 :         .await
     160            0 :         .unwrap_or(Err(mgmt_api::Error::Cancelled))
     161            0 :     }
     162              : 
     163            0 :     pub(crate) fn update_from_record(
     164            0 :         &mut self,
     165            0 :         record: crate::persistence::SafekeeperUpsert,
     166            0 :     ) -> anyhow::Result<()> {
     167            0 :         let crate::persistence::SafekeeperUpsert {
     168            0 :             active: _,
     169            0 :             availability_zone_id: _,
     170            0 :             host,
     171            0 :             http_port,
     172            0 :             https_port,
     173            0 :             id,
     174            0 :             port: _,
     175            0 :             region_id: _,
     176            0 :             version: _,
     177            0 :         } = record.clone();
     178            0 :         if id != self.id.0 as i64 {
     179              :             // The way the function is called ensures this. If we regress on that, it's a bug.
     180            0 :             panic!(
     181            0 :                 "id can't be changed via update_from_record function: {id} != {}",
     182            0 :                 self.id.0
     183            0 :             );
     184            0 :         }
     185            0 :         if self.use_https && https_port.is_none() {
     186            0 :             anyhow::bail!(
     187            0 :                 "cannot update safekeeper {id}: \
     188            0 :                 https is enabled, but https port is not specified"
     189            0 :             );
     190            0 :         }
     191            0 :         self.skp =
     192            0 :             crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
     193            0 :         self.listen_http_port = http_port as u16;
     194            0 :         self.listen_https_port = https_port.map(|x| x as u16);
     195            0 :         self.listen_http_addr = host;
     196            0 :         Ok(())
     197            0 :     }
     198              : }
        

Generated by: LCOV version 2.1-beta