LCOV - code coverage report
Current view: top level - compute_tools/src - lsn_lease.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 141 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::str::FromStr;
       2              : use std::sync::Arc;
       3              : use std::thread;
       4              : use std::time::{Duration, SystemTime};
       5              : 
       6              : use anyhow::{Result, bail};
       7              : use compute_api::spec::{ComputeMode, PageserverProtocol};
       8              : use itertools::Itertools as _;
       9              : use pageserver_page_api as page_api;
      10              : use postgres::{NoTls, SimpleQueryMessage};
      11              : use tracing::{info, warn};
      12              : use utils::id::{TenantId, TimelineId};
      13              : use utils::lsn::Lsn;
      14              : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
      15              : 
      16              : use crate::compute::ComputeNode;
      17              : 
      18              : /// Spawns a background thread to periodically renew LSN leases for static compute.
      19              : /// Do nothing if the compute is not in static mode.
      20            0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
      21            0 :     let (tenant_id, timeline_id, lsn) = {
      22            0 :         let state = compute.state.lock().unwrap();
      23            0 :         let spec = state.pspec.as_ref().expect("Spec must be set");
      24            0 :         match spec.spec.mode {
      25            0 :             ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
      26            0 :             _ => return,
      27              :         }
      28              :     };
      29            0 :     let compute = compute.clone();
      30              : 
      31            0 :     let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
      32            0 :     thread::spawn(move || {
      33            0 :         let _entered = span.entered();
      34            0 :         if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
      35              :             // TODO: might need stronger error feedback than logging an warning.
      36            0 :             warn!("Exited with error: {e}");
      37            0 :         }
      38            0 :     });
      39            0 : }
      40              : 
      41              : /// Renews lsn lease periodically so static compute are not affected by GC.
      42            0 : fn lsn_lease_bg_task(
      43            0 :     compute: Arc<ComputeNode>,
      44            0 :     tenant_id: TenantId,
      45            0 :     timeline_id: TimelineId,
      46            0 :     lsn: Lsn,
      47            0 : ) -> Result<()> {
      48              :     loop {
      49            0 :         let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
      50            0 :         let valid_duration = valid_until
      51            0 :             .duration_since(SystemTime::now())
      52            0 :             .unwrap_or(Duration::ZERO);
      53              : 
      54              :         // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
      55            0 :         let sleep_duration = valid_duration
      56            0 :             .saturating_sub(Duration::from_secs(60))
      57            0 :             .max(valid_duration / 2);
      58              : 
      59            0 :         info!(
      60            0 :             "Request succeeded, sleeping for {} seconds",
      61            0 :             sleep_duration.as_secs()
      62              :         );
      63            0 :         compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
      64              :     }
      65            0 : }
      66              : 
      67              : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
      68              : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
      69            0 : fn acquire_lsn_lease_with_retry(
      70            0 :     compute: &Arc<ComputeNode>,
      71            0 :     tenant_id: TenantId,
      72            0 :     timeline_id: TimelineId,
      73            0 :     lsn: Lsn,
      74            0 : ) -> Result<SystemTime> {
      75            0 :     let mut attempts = 0usize;
      76            0 :     let mut retry_period_ms: f64 = 500.0;
      77              :     const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
      78              : 
      79              :     loop {
      80              :         // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
      81            0 :         let (connstrings, auth) = {
      82            0 :             let state = compute.state.lock().unwrap();
      83            0 :             let spec = state.pspec.as_ref().expect("spec must be set");
      84            0 :             (
      85            0 :                 spec.pageserver_connstr.clone(),
      86            0 :                 spec.storage_auth_token.clone(),
      87            0 :             )
      88            0 :         };
      89              : 
      90            0 :         let result =
      91            0 :             try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
      92            0 :         match result {
      93            0 :             Ok(Some(res)) => {
      94            0 :                 return Ok(res);
      95              :             }
      96              :             Ok(None) => {
      97            0 :                 bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
      98              :             }
      99            0 :             Err(e) => {
     100            0 :                 warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
     101              : 
     102            0 :                 compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
     103            0 :                     retry_period_ms as u64,
     104            0 :                 ));
     105            0 :                 retry_period_ms *= 1.5;
     106            0 :                 retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
     107              :             }
     108              :         }
     109            0 :         attempts += 1;
     110              :     }
     111            0 : }
     112              : 
     113              : /// Tries to acquire LSN leases on all Pageserver shards.
     114            0 : fn try_acquire_lsn_lease(
     115            0 :     connstrings: &str,
     116            0 :     auth: Option<&str>,
     117            0 :     tenant_id: TenantId,
     118            0 :     timeline_id: TimelineId,
     119            0 :     lsn: Lsn,
     120            0 : ) -> Result<Option<SystemTime>> {
     121            0 :     let connstrings = connstrings.split(',').collect_vec();
     122            0 :     let shard_count = connstrings.len();
     123            0 :     let mut leases = Vec::new();
     124              : 
     125            0 :     for (shard_number, &connstring) in connstrings.iter().enumerate() {
     126            0 :         let tenant_shard_id = match shard_count {
     127            0 :             0 | 1 => TenantShardId::unsharded(tenant_id),
     128            0 :             shard_count => TenantShardId {
     129            0 :                 tenant_id,
     130            0 :                 shard_number: ShardNumber(shard_number as u8),
     131            0 :                 shard_count: ShardCount::new(shard_count as u8),
     132            0 :             },
     133              :         };
     134              : 
     135            0 :         let lease = match PageserverProtocol::from_connstring(connstring)? {
     136              :             PageserverProtocol::Libpq => {
     137            0 :                 acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
     138              :             }
     139              :             PageserverProtocol::Grpc => {
     140            0 :                 acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
     141              :             }
     142              :         };
     143            0 :         leases.push(lease);
     144              :     }
     145              : 
     146            0 :     Ok(leases.into_iter().min().flatten())
     147            0 : }
     148              : 
     149              : /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
     150              : /// postgresql:// scheme.
     151            0 : fn acquire_lsn_lease_libpq(
     152            0 :     connstring: &str,
     153            0 :     auth: Option<&str>,
     154            0 :     tenant_shard_id: TenantShardId,
     155            0 :     timeline_id: TimelineId,
     156            0 :     lsn: Lsn,
     157            0 : ) -> Result<Option<SystemTime>> {
     158            0 :     let mut config = postgres::Config::from_str(connstring)?;
     159            0 :     if let Some(auth) = auth {
     160            0 :         config.password(auth);
     161            0 :     }
     162            0 :     let mut client = config.connect(NoTls)?;
     163            0 :     let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
     164            0 :     let res = client.simple_query(&cmd)?;
     165            0 :     let msg = match res.first() {
     166            0 :         Some(msg) => msg,
     167            0 :         None => bail!("empty response"),
     168              :     };
     169            0 :     let row = match msg {
     170            0 :         SimpleQueryMessage::Row(row) => row,
     171            0 :         _ => bail!("error parsing lsn lease response"),
     172              :     };
     173              : 
     174              :     // Note: this will be None if a lease is explicitly not granted.
     175            0 :     let valid_until_str = row.get("valid_until");
     176              : 
     177            0 :     let valid_until = valid_until_str.map(|s| {
     178            0 :         SystemTime::UNIX_EPOCH
     179            0 :             .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
     180            0 :             .expect("Time larger than max SystemTime could handle")
     181            0 :     });
     182            0 :     Ok(valid_until)
     183            0 : }
     184              : 
     185              : /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
     186              : /// grpc:// scheme.
     187            0 : fn acquire_lsn_lease_grpc(
     188            0 :     connstring: &str,
     189            0 :     auth: Option<&str>,
     190            0 :     tenant_shard_id: TenantShardId,
     191            0 :     timeline_id: TimelineId,
     192            0 :     lsn: Lsn,
     193            0 : ) -> Result<Option<SystemTime>> {
     194            0 :     tokio::runtime::Handle::current().block_on(async move {
     195            0 :         let mut client = page_api::Client::connect(
     196            0 :             connstring.to_string(),
     197            0 :             tenant_shard_id.tenant_id,
     198            0 :             timeline_id,
     199            0 :             tenant_shard_id.to_index(),
     200            0 :             auth.map(String::from),
     201            0 :             None,
     202            0 :         )
     203            0 :         .await?;
     204              : 
     205            0 :         let req = page_api::LeaseLsnRequest { lsn };
     206            0 :         match client.lease_lsn(req).await {
     207            0 :             Ok(expires) => Ok(Some(expires)),
     208              :             // Lease couldn't be acquired because the LSN has been garbage collected.
     209            0 :             Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
     210            0 :             Err(err) => Err(err.into()),
     211              :         }
     212            0 :     })
     213            0 : }
        

Generated by: LCOV version 2.1-beta