LCOV - code coverage report
Current view: top level - compute_tools/src - compute_promote.rs (source / functions) Coverage Total Hit
Test: d6474ad0ed8f0fd27d73da2c09e7db82030cd3e5.info Lines: 0.0 % 123 0
Test Date: 2025-07-31 21:24:17 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, bail};
       3              : use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
       4              : use std::sync::Arc;
       5              : use std::time::Instant;
       6              : use tracing::info;
       7              : 
       8              : impl ComputeNode {
       9              :     /// Returns only when promote fails or succeeds. If http client calling this function
      10              :     /// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
      11              :     /// Called by control plane on secondary after primary endpoint is terminated
      12              :     /// Has a failpoint "compute-promotion"
      13            0 :     pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
      14            0 :         let this = self.clone();
      15            0 :         let promote_fn = async move || match this.promote_impl(cfg).await {
      16            0 :             Ok(state) => state,
      17            0 :             Err(err) => {
      18            0 :                 tracing::error!(%err, "promoting replica");
      19            0 :                 let error = format!("{err:#}");
      20            0 :                 PromoteState::Failed { error }
      21              :             }
      22            0 :         };
      23            0 :         let start_promotion = || {
      24            0 :             let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
      25            0 :             tokio::spawn(async move { tx.send(promote_fn().await) });
      26            0 :             rx
      27            0 :         };
      28              : 
      29              :         let mut task;
      30              :         // promote_impl locks self.state so we need to unlock it before calling task.changed()
      31              :         {
      32            0 :             let promote_state = &mut self.state.lock().unwrap().promote_state;
      33            0 :             task = promote_state.get_or_insert_with(start_promotion).clone()
      34              :         }
      35            0 :         if task.changed().await.is_err() {
      36            0 :             let error = "promote sender dropped".to_string();
      37            0 :             return PromoteState::Failed { error };
      38            0 :         }
      39            0 :         task.borrow().clone()
      40            0 :     }
      41              : 
      42            0 :     async fn promote_impl(self: &Arc<Self>, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
      43              :         #[allow(unused_mut)]
      44            0 :         let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
      45            0 :         let safekeepers_str = new_pspec.safekeeper_connstrings.join(",");
      46            0 :         if safekeepers_str.is_empty() {
      47            0 :             bail!("empty safekeepers list");
      48            0 :         }
      49              : 
      50              :         {
      51            0 :             let state = self.state.lock().unwrap();
      52            0 :             let mode = &state.pspec.as_ref().unwrap().spec.mode;
      53            0 :             if *mode != compute_api::spec::ComputeMode::Replica {
      54            0 :                 bail!("compute mode \"{}\" is not replica", mode.to_type_str());
      55            0 :             }
      56            0 :             match &state.lfc_prewarm_state {
      57            0 :                 status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
      58            0 :                     bail!("compute {status}")
      59              :                 }
      60            0 :                 LfcPrewarmState::Failed { error } => {
      61            0 :                     tracing::warn!(%error, "compute prewarm failed")
      62              :                 }
      63            0 :                 _ => {}
      64              :             }
      65              :         }
      66              : 
      67            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
      68            0 :             .await
      69            0 :             .context("connecting to postgres")?;
      70            0 :         let mut now = Instant::now();
      71              : 
      72            0 :         let primary_lsn = cfg.wal_flush_lsn;
      73            0 :         let mut standby_lsn = utils::lsn::Lsn::INVALID;
      74              :         const RETRIES: i32 = 20;
      75            0 :         for i in 0..=RETRIES {
      76            0 :             let row = client
      77            0 :                 .query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
      78            0 :                 .await
      79            0 :                 .context("getting last replay lsn")?;
      80            0 :             let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
      81            0 :             standby_lsn = lsn.into();
      82            0 :             if standby_lsn >= primary_lsn {
      83            0 :                 break;
      84            0 :             }
      85            0 :             info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
      86            0 :             tokio::time::sleep(std::time::Duration::from_secs(1)).await;
      87              :         }
      88            0 :         if standby_lsn < primary_lsn {
      89            0 :             bail!("didn't catch up with primary in {RETRIES} retries");
      90            0 :         }
      91            0 :         let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
      92            0 :         now = Instant::now();
      93              : 
      94              :         // $1 doesn't work with ALTER SYSTEM SET
      95            0 :         let safekeepers_sql = format!("ALTER SYSTEM SET neon.safekeepers='{safekeepers_str}'");
      96            0 :         client
      97            0 :             .query(&safekeepers_sql, &[])
      98            0 :             .await
      99            0 :             .context("setting safekeepers")?;
     100            0 :         client
     101            0 :             .query(
     102            0 :                 "ALTER SYSTEM SET synchronous_standby_names=walproposer",
     103            0 :                 &[],
     104            0 :             )
     105            0 :             .await
     106            0 :             .context("setting synchronous_standby_names")?;
     107            0 :         client
     108            0 :             .query("SELECT pg_catalog.pg_reload_conf()", &[])
     109            0 :             .await
     110            0 :             .context("reloading postgres config")?;
     111              : 
     112              :         #[cfg(feature = "testing")]
     113            0 :         fail::fail_point!("compute-promotion", |_| bail!(
     114            0 :             "compute-promotion failpoint"
     115              :         ));
     116              : 
     117            0 :         let row = client
     118            0 :             .query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
     119            0 :             .await
     120            0 :             .context("pg_promote")?;
     121            0 :         if !row.get::<usize, bool>(0) {
     122            0 :             bail!("pg_promote() failed");
     123            0 :         }
     124            0 :         let pg_promote_time_ms = now.elapsed().as_millis() as u32;
     125            0 :         let now = Instant::now();
     126              : 
     127            0 :         let row = client
     128            0 :             .query_one("SHOW transaction_read_only", &[])
     129            0 :             .await
     130            0 :             .context("getting transaction_read_only")?;
     131            0 :         if row.get::<usize, &str>(0) == "on" {
     132            0 :             bail!("replica in read only mode after promotion");
     133            0 :         }
     134              : 
     135              :         // Already checked validity in http handler
     136              :         {
     137            0 :             let mut state = self.state.lock().unwrap();
     138              : 
     139              :             // Local setup has different ports for pg process (port=) for primary and secondary.
     140              :             // Primary is stopped so we need secondary's "port" value
     141              :             #[cfg(feature = "testing")]
     142              :             {
     143            0 :                 let old_spec = &state.pspec.as_ref().unwrap().spec;
     144            0 :                 let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
     145            0 :                     bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
     146              :                 };
     147            0 :                 let set: std::collections::HashMap<&str, &str> = old_conf
     148            0 :                     .split_terminator('\n')
     149            0 :                     .map(|e| e.split_once("=").expect("invalid item"))
     150            0 :                     .collect();
     151              : 
     152            0 :                 let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
     153            0 :                     bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
     154              :                 };
     155            0 :                 new_conf.push_str(&format!("port={}\n", set["port"]));
     156              :             }
     157              : 
     158            0 :             tracing::debug!("applied spec: {:#?}", new_pspec.spec);
     159            0 :             if self.params.lakebase_mode {
     160            0 :                 ComputeNode::set_spec(&self.params, &mut state, new_pspec);
     161            0 :             } else {
     162            0 :                 state.pspec = Some(new_pspec);
     163            0 :             }
     164              :         }
     165              : 
     166            0 :         info!("applied new spec, reconfiguring as primary");
     167              :         // reconfigure calls apply_spec_sql which blocks on a current runtime. To avoid panicking
     168              :         // due to nested runtimes, wait on this task in a blocking way
     169            0 :         let this = self.clone();
     170            0 :         tokio::task::spawn_blocking(move || this.reconfigure()).await??;
     171            0 :         let reconfigure_time_ms = now.elapsed().as_millis() as u32;
     172              : 
     173            0 :         Ok(PromoteState::Completed {
     174            0 :             lsn_wait_time_ms,
     175            0 :             pg_promote_time_ms,
     176            0 :             reconfigure_time_ms,
     177            0 :         })
     178            0 :     }
     179              : }
        

Generated by: LCOV version 2.1-beta