LCOV - code coverage report
Current view: top level - compute_tools/src - lsn_lease.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 134 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::str::FromStr;
       2              : use std::sync::Arc;
       3              : use std::thread;
       4              : use std::time::{Duration, SystemTime};
       5              : 
       6              : use anyhow::{Result, bail};
       7              : use compute_api::spec::ComputeMode;
       8              : use postgres::{NoTls, SimpleQueryMessage};
       9              : use tracing::{info, warn};
      10              : use utils::id::{TenantId, TimelineId};
      11              : use utils::lsn::Lsn;
      12              : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
      13              : 
      14              : use crate::compute::ComputeNode;
      15              : 
      16              : /// Spawns a background thread to periodically renew LSN leases for static compute.
      17              : /// Do nothing if the compute is not in static mode.
      18            0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
      19            0 :     let (tenant_id, timeline_id, lsn) = {
      20            0 :         let state = compute.state.lock().unwrap();
      21            0 :         let spec = state.pspec.as_ref().expect("Spec must be set");
      22            0 :         match spec.spec.mode {
      23            0 :             ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
      24            0 :             _ => return,
      25              :         }
      26              :     };
      27            0 :     let compute = compute.clone();
      28              : 
      29            0 :     let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
      30            0 :     thread::spawn(move || {
      31            0 :         let _entered = span.entered();
      32            0 :         if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
      33              :             // TODO: might need stronger error feedback than logging an warning.
      34            0 :             warn!("Exited with error: {e}");
      35            0 :         }
      36            0 :     });
      37            0 : }
      38              : 
      39              : /// Renews lsn lease periodically so static compute are not affected by GC.
      40            0 : fn lsn_lease_bg_task(
      41            0 :     compute: Arc<ComputeNode>,
      42            0 :     tenant_id: TenantId,
      43            0 :     timeline_id: TimelineId,
      44            0 :     lsn: Lsn,
      45            0 : ) -> Result<()> {
      46              :     loop {
      47            0 :         let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
      48            0 :         let valid_duration = valid_until
      49            0 :             .duration_since(SystemTime::now())
      50            0 :             .unwrap_or(Duration::ZERO);
      51            0 : 
      52            0 :         // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
      53            0 :         let sleep_duration = valid_duration
      54            0 :             .saturating_sub(Duration::from_secs(60))
      55            0 :             .max(valid_duration / 2);
      56            0 : 
      57            0 :         info!(
      58            0 :             "Request succeeded, sleeping for {} seconds",
      59            0 :             sleep_duration.as_secs()
      60              :         );
      61            0 :         compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
      62              :     }
      63            0 : }
      64              : 
      65              : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
      66              : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
      67            0 : fn acquire_lsn_lease_with_retry(
      68            0 :     compute: &Arc<ComputeNode>,
      69            0 :     tenant_id: TenantId,
      70            0 :     timeline_id: TimelineId,
      71            0 :     lsn: Lsn,
      72            0 : ) -> Result<SystemTime> {
      73            0 :     let mut attempts = 0usize;
      74            0 :     let mut retry_period_ms: f64 = 500.0;
      75              :     const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
      76              : 
      77              :     loop {
      78              :         // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
      79            0 :         let configs = {
      80            0 :             let state = compute.state.lock().unwrap();
      81            0 : 
      82            0 :             let spec = state.pspec.as_ref().expect("spec must be set");
      83            0 : 
      84            0 :             let conn_strings = spec.pageserver_connstr.split(',');
      85            0 : 
      86            0 :             conn_strings
      87            0 :                 .map(|connstr| {
      88            0 :                     let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
      89            0 :                     if let Some(storage_auth_token) = &spec.storage_auth_token {
      90            0 :                         config.password(storage_auth_token.clone());
      91            0 :                     }
      92            0 :                     config
      93            0 :                 })
      94            0 :                 .collect::<Vec<_>>()
      95            0 :         };
      96            0 : 
      97            0 :         let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
      98            0 :         match result {
      99            0 :             Ok(Some(res)) => {
     100            0 :                 return Ok(res);
     101              :             }
     102              :             Ok(None) => {
     103            0 :                 bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
     104              :             }
     105            0 :             Err(e) => {
     106            0 :                 warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
     107              : 
     108            0 :                 compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
     109            0 :                     retry_period_ms as u64,
     110            0 :                 ));
     111            0 :                 retry_period_ms *= 1.5;
     112            0 :                 retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
     113            0 :             }
     114            0 :         }
     115            0 :         attempts += 1;
     116              :     }
     117            0 : }
     118              : 
     119              : /// Tries to acquire an LSN lease through PS page_service API.
     120            0 : fn try_acquire_lsn_lease(
     121            0 :     tenant_id: TenantId,
     122            0 :     timeline_id: TimelineId,
     123            0 :     lsn: Lsn,
     124            0 :     configs: &[postgres::Config],
     125            0 : ) -> Result<Option<SystemTime>> {
     126            0 :     fn get_valid_until(
     127            0 :         config: &postgres::Config,
     128            0 :         tenant_shard_id: TenantShardId,
     129            0 :         timeline_id: TimelineId,
     130            0 :         lsn: Lsn,
     131            0 :     ) -> Result<Option<SystemTime>> {
     132            0 :         let mut client = config.connect(NoTls)?;
     133            0 :         let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
     134            0 :         let res = client.simple_query(&cmd)?;
     135            0 :         let msg = match res.first() {
     136            0 :             Some(msg) => msg,
     137            0 :             None => bail!("empty response"),
     138              :         };
     139            0 :         let row = match msg {
     140            0 :             SimpleQueryMessage::Row(row) => row,
     141            0 :             _ => bail!("error parsing lsn lease response"),
     142              :         };
     143              : 
     144              :         // Note: this will be None if a lease is explicitly not granted.
     145            0 :         let valid_until_str = row.get("valid_until");
     146            0 : 
     147            0 :         let valid_until = valid_until_str.map(|s| {
     148            0 :             SystemTime::UNIX_EPOCH
     149            0 :                 .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
     150            0 :                 .expect("Time larger than max SystemTime could handle")
     151            0 :         });
     152            0 :         Ok(valid_until)
     153            0 :     }
     154              : 
     155            0 :     let shard_count = configs.len();
     156              : 
     157            0 :     let valid_until = if shard_count > 1 {
     158            0 :         configs
     159            0 :             .iter()
     160            0 :             .enumerate()
     161            0 :             .map(|(shard_number, config)| {
     162            0 :                 let tenant_shard_id = TenantShardId {
     163            0 :                     tenant_id,
     164            0 :                     shard_count: ShardCount::new(shard_count as u8),
     165            0 :                     shard_number: ShardNumber(shard_number as u8),
     166            0 :                 };
     167            0 :                 get_valid_until(config, tenant_shard_id, timeline_id, lsn)
     168            0 :             })
     169            0 :             .collect::<Result<Vec<Option<SystemTime>>>>()?
     170            0 :             .into_iter()
     171            0 :             .min()
     172            0 :             .unwrap()
     173              :     } else {
     174            0 :         get_valid_until(
     175            0 :             &configs[0],
     176            0 :             TenantShardId::unsharded(tenant_id),
     177            0 :             timeline_id,
     178            0 :             lsn,
     179            0 :         )?
     180              :     };
     181              : 
     182            0 :     Ok(valid_until)
     183            0 : }
        

Generated by: LCOV version 2.1-beta