LCOV - code coverage report
Current view: top level - compute_tools/src - lsn_lease.rs (source / functions) Coverage Total Hit
Test: 5622bf0bd21bb5ef9fd522287b5558a9745ae85e.info Lines: 0.0 % 168 0
Test Date: 2025-07-30 19:58:24 Functions: 0.0 % 16 0

            Line data    Source code
       1              : use std::str::FromStr;
       2              : use std::sync::Arc;
       3              : use std::time::{Duration, SystemTime};
       4              : 
       5              : use anyhow::{Result, bail};
       6              : use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
       7              : use futures::StreamExt;
       8              : use pageserver_page_api as page_api;
       9              : use postgres::{NoTls, SimpleQueryMessage};
      10              : use tracing::{Instrument, info, warn};
      11              : use utils::id::{TenantId, TimelineId};
      12              : use utils::lsn::Lsn;
      13              : use utils::shard::TenantShardId;
      14              : 
      15              : use crate::compute::ComputeNode;
      16              : 
      17              : /// Spawns a background thread to periodically renew LSN leases for static compute.
      18              : /// Do nothing if the compute is not in static mode. MUST run this within a tokio runtime.
      19            0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
      20            0 :     let (tenant_id, timeline_id, lsn) = {
      21            0 :         let state = compute.state.lock().unwrap();
      22            0 :         let spec = state.pspec.as_ref().expect("Spec must be set");
      23            0 :         match spec.spec.mode {
      24            0 :             ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
      25            0 :             _ => return,
      26              :         }
      27              :     };
      28            0 :     let compute = compute.clone();
      29              : 
      30            0 :     let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
      31            0 :     tokio::spawn(
      32            0 :         async move {
      33            0 :             if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn).await {
      34              :                 // TODO: might need stronger error feedback than logging an warning.
      35            0 :                 warn!("Exited with error: {e}");
      36            0 :             }
      37            0 :         }
      38            0 :         .instrument(span),
      39              :     );
      40            0 : }
      41              : 
      42              : /// Renews lsn lease periodically so static compute are not affected by GC.
      43            0 : async fn lsn_lease_bg_task(
      44            0 :     compute: Arc<ComputeNode>,
      45            0 :     tenant_id: TenantId,
      46            0 :     timeline_id: TimelineId,
      47            0 :     lsn: Lsn,
      48            0 : ) -> Result<()> {
      49              :     loop {
      50            0 :         let valid_until =
      51            0 :             acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?;
      52            0 :         let valid_duration = valid_until
      53            0 :             .duration_since(SystemTime::now())
      54            0 :             .unwrap_or(Duration::ZERO);
      55              : 
      56              :         // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
      57            0 :         let sleep_duration = valid_duration
      58            0 :             .saturating_sub(Duration::from_secs(60))
      59            0 :             .max(valid_duration / 2);
      60              : 
      61            0 :         info!(
      62            0 :             "Request succeeded, sleeping for {} seconds",
      63            0 :             sleep_duration.as_secs()
      64              :         );
      65            0 :         let compute = compute.clone();
      66            0 :         tokio::task::spawn_blocking(move || {
      67            0 :             compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
      68            0 :         })
      69            0 :         .await?;
      70              :     }
      71            0 : }
      72              : 
      73              : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
      74              : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
      75            0 : async fn acquire_lsn_lease_with_retry(
      76            0 :     compute: &Arc<ComputeNode>,
      77            0 :     tenant_id: TenantId,
      78            0 :     timeline_id: TimelineId,
      79            0 :     lsn: Lsn,
      80            0 : ) -> Result<SystemTime> {
      81            0 :     let mut attempts = 0usize;
      82            0 :     let mut retry_period_ms: f64 = 500.0;
      83              :     const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
      84              : 
      85              :     loop {
      86              :         // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
      87            0 :         let (conninfo, auth) = {
      88            0 :             let state = compute.state.lock().unwrap();
      89            0 :             let spec = state.pspec.as_ref().expect("spec must be set");
      90            0 :             (
      91            0 :                 spec.pageserver_conninfo.clone(),
      92            0 :                 spec.storage_auth_token.clone(),
      93            0 :             )
      94            0 :         };
      95              : 
      96            0 :         let result =
      97            0 :             try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await;
      98            0 :         match result {
      99            0 :             Ok(Some(res)) => {
     100            0 :                 return Ok(res);
     101              :             }
     102              :             Ok(None) => {
     103            0 :                 bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
     104              :             }
     105            0 :             Err(e) => {
     106            0 :                 warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
     107              : 
     108            0 :                 compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
     109            0 :                     retry_period_ms as u64,
     110            0 :                 ));
     111            0 :                 retry_period_ms *= 1.5;
     112            0 :                 retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
     113              :             }
     114              :         }
     115            0 :         attempts += 1;
     116              :     }
     117            0 : }
     118              : 
     119              : /// Tries to acquire LSN leases on all Pageserver shards.
     120            0 : async fn try_acquire_lsn_lease(
     121            0 :     conninfo: PageserverConnectionInfo,
     122            0 :     auth: Option<&str>,
     123            0 :     tenant_id: TenantId,
     124            0 :     timeline_id: TimelineId,
     125            0 :     lsn: Lsn,
     126            0 : ) -> Result<Option<SystemTime>> {
     127              :     const MAX_CONCURRENT_LEASE_CONNECTIONS: usize = 8;
     128              : 
     129            0 :     let mut jobs = Vec::new();
     130            0 :     for (shard_index, shard) in conninfo.shards.into_iter() {
     131            0 :         let tenant_shard_id = TenantShardId {
     132            0 :             tenant_id,
     133            0 :             shard_number: shard_index.shard_number,
     134            0 :             shard_count: shard_index.shard_count,
     135            0 :         };
     136              : 
     137              :         // XXX: If there are more than pageserver for the one shard, do we need to get a
     138              :         // leas on all of them? Currently, that's what we assume, but this is hypothetical
     139              :         // as of this writing, as we never pass the info for more than one pageserver per
     140              :         // shard.
     141              : 
     142            0 :         for shard in shard.pageservers {
     143            0 :             let shard = shard.clone();
     144            0 :             jobs.push(async move {
     145            0 :                 match conninfo.prefer_protocol {
     146              :                     PageserverProtocol::Grpc => {
     147            0 :                         acquire_lsn_lease_grpc(
     148            0 :                             &shard.grpc_url.unwrap(),
     149            0 :                             auth,
     150            0 :                             tenant_shard_id,
     151            0 :                             timeline_id,
     152            0 :                             lsn,
     153            0 :                         )
     154            0 :                         .await
     155              :                     }
     156              :                     PageserverProtocol::Libpq => {
     157            0 :                         acquire_lsn_lease_libpq(
     158            0 :                             &shard.libpq_url.unwrap(),
     159            0 :                             auth,
     160            0 :                             tenant_shard_id,
     161            0 :                             timeline_id,
     162            0 :                             lsn,
     163            0 :                         )
     164            0 :                         .await
     165              :                     }
     166              :                 }
     167            0 :             });
     168              :         }
     169              :     }
     170              : 
     171            0 :     let mut stream = futures::stream::iter(jobs).buffer_unordered(MAX_CONCURRENT_LEASE_CONNECTIONS);
     172            0 :     let mut leases = Vec::new();
     173            0 :     while let Some(res) = stream.next().await {
     174            0 :         let lease = res?;
     175            0 :         leases.push(lease);
     176              :     }
     177            0 :     Ok(leases.into_iter().flatten().min())
     178            0 : }
     179              : 
     180              : /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
     181              : /// postgresql:// scheme.
     182            0 : async fn acquire_lsn_lease_libpq(
     183            0 :     connstring: &str,
     184            0 :     auth: Option<&str>,
     185            0 :     tenant_shard_id: TenantShardId,
     186            0 :     timeline_id: TimelineId,
     187            0 :     lsn: Lsn,
     188            0 : ) -> Result<Option<SystemTime>> {
     189            0 :     let mut config = tokio_postgres::Config::from_str(connstring)?;
     190            0 :     if let Some(auth) = auth {
     191            0 :         config.password(auth);
     192            0 :     }
     193            0 :     let (client, connection) = config.connect(NoTls).await?;
     194              : 
     195            0 :     tokio::spawn(async move {
     196            0 :         if let Err(e) = connection.await {
     197            0 :             tracing::warn!("lease lsn connection error: {}", e);
     198            0 :         }
     199            0 :     });
     200              : 
     201            0 :     let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
     202            0 :     let res = client.simple_query(&cmd).await?;
     203            0 :     let msg = match res.first() {
     204            0 :         Some(msg) => msg,
     205            0 :         None => bail!("empty response"),
     206              :     };
     207            0 :     let row = match msg {
     208            0 :         SimpleQueryMessage::Row(row) => row,
     209            0 :         _ => bail!("error parsing lsn lease response"),
     210              :     };
     211              : 
     212              :     // Note: this will be None if a lease is explicitly not granted.
     213            0 :     let valid_until_str = row.get("valid_until");
     214              : 
     215            0 :     let valid_until = valid_until_str.map(|s| {
     216            0 :         SystemTime::UNIX_EPOCH
     217            0 :             .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
     218            0 :             .expect("Time larger than max SystemTime could handle")
     219            0 :     });
     220              : 
     221            0 :     Ok(valid_until)
     222            0 : }
     223              : 
     224              : /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
     225              : /// grpc:// scheme.
     226            0 : async fn acquire_lsn_lease_grpc(
     227            0 :     connstring: &str,
     228            0 :     auth: Option<&str>,
     229            0 :     tenant_shard_id: TenantShardId,
     230            0 :     timeline_id: TimelineId,
     231            0 :     lsn: Lsn,
     232            0 : ) -> Result<Option<SystemTime>> {
     233            0 :     let mut client = page_api::Client::connect(
     234            0 :         connstring.to_string(),
     235            0 :         tenant_shard_id.tenant_id,
     236            0 :         timeline_id,
     237            0 :         tenant_shard_id.to_index(),
     238            0 :         auth.map(String::from),
     239            0 :         None,
     240            0 :     )
     241            0 :     .await?;
     242              : 
     243            0 :     let req = page_api::LeaseLsnRequest { lsn };
     244            0 :     match client.lease_lsn(req).await {
     245            0 :         Ok(expires) => Ok(Some(expires)),
     246              :         // Lease couldn't be acquired because the LSN has been garbage collected.
     247            0 :         Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
     248            0 :         Err(err) => Err(err.into()),
     249              :     }
     250            0 : }
        

Generated by: LCOV version 2.1-beta