LCOV - code coverage report
Current view: top level - compute_tools/src - compute_promote.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 0.0 % 120 0
Test Date: 2025-07-31 15:59:03 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, bail};
       3              : use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
       4              : use std::time::Instant;
       5              : use tracing::info;
       6              : 
       7              : impl ComputeNode {
       8              :     /// Returns only when promote fails or succeeds. If http client calling this function
       9              :     /// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
      10              :     /// Called by control plane on secondary after primary endpoint is terminated
      11              :     /// Has a failpoint "compute-promotion"
      12            0 :     pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
      13            0 :         let this = self.clone();
      14            0 :         let promote_fn = async move || match this.promote_impl(cfg).await {
      15            0 :             Ok(state) => state,
      16            0 :             Err(err) => {
      17            0 :                 tracing::error!(%err, "promoting replica");
      18            0 :                 let error = format!("{err:#}");
      19            0 :                 PromoteState::Failed { error }
      20              :             }
      21            0 :         };
      22            0 :         let start_promotion = || {
      23            0 :             let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
      24            0 :             tokio::spawn(async move { tx.send(promote_fn().await) });
      25            0 :             rx
      26            0 :         };
      27              : 
      28              :         let mut task;
      29              :         // promote_impl locks self.state so we need to unlock it before calling task.changed()
      30              :         {
      31            0 :             let promote_state = &mut self.state.lock().unwrap().promote_state;
      32            0 :             task = promote_state.get_or_insert_with(start_promotion).clone()
      33              :         }
      34            0 :         if task.changed().await.is_err() {
      35            0 :             let error = "promote sender dropped".to_string();
      36            0 :             return PromoteState::Failed { error };
      37            0 :         }
      38            0 :         task.borrow().clone()
      39            0 :     }
      40              : 
      41            0 :     async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
      42              :         {
      43            0 :             let state = self.state.lock().unwrap();
      44            0 :             let mode = &state.pspec.as_ref().unwrap().spec.mode;
      45            0 :             if *mode != compute_api::spec::ComputeMode::Replica {
      46            0 :                 bail!("compute mode \"{}\" is not replica", mode.to_type_str());
      47            0 :             }
      48            0 :             match &state.lfc_prewarm_state {
      49            0 :                 status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
      50            0 :                     bail!("compute {status}")
      51              :                 }
      52            0 :                 LfcPrewarmState::Failed { error } => {
      53            0 :                     tracing::warn!(%error, "compute prewarm failed")
      54              :                 }
      55            0 :                 _ => {}
      56              :             }
      57              :         }
      58              : 
      59            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
      60            0 :             .await
      61            0 :             .context("connecting to postgres")?;
      62            0 :         let mut now = Instant::now();
      63              : 
      64            0 :         let primary_lsn = cfg.wal_flush_lsn;
      65            0 :         let mut standby_lsn = utils::lsn::Lsn::INVALID;
      66              :         const RETRIES: i32 = 20;
      67            0 :         for i in 0..=RETRIES {
      68            0 :             let row = client
      69            0 :                 .query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
      70            0 :                 .await
      71            0 :                 .context("getting last replay lsn")?;
      72            0 :             let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
      73            0 :             standby_lsn = lsn.into();
      74            0 :             if standby_lsn >= primary_lsn {
      75            0 :                 break;
      76            0 :             }
      77            0 :             info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
      78            0 :             tokio::time::sleep(std::time::Duration::from_secs(1)).await;
      79              :         }
      80            0 :         if standby_lsn < primary_lsn {
      81            0 :             bail!("didn't catch up with primary in {RETRIES} retries");
      82            0 :         }
      83            0 :         let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
      84            0 :         now = Instant::now();
      85              : 
      86              :         // using $1 doesn't work with ALTER SYSTEM SET
      87            0 :         let safekeepers_sql = format!(
      88            0 :             "ALTER SYSTEM SET neon.safekeepers='{}'",
      89            0 :             cfg.spec.safekeeper_connstrings.join(",")
      90              :         );
      91            0 :         client
      92            0 :             .query(&safekeepers_sql, &[])
      93            0 :             .await
      94            0 :             .context("setting safekeepers")?;
      95            0 :         client
      96            0 :             .query(
      97            0 :                 "ALTER SYSTEM SET synchronous_standby_names=walproposer",
      98            0 :                 &[],
      99            0 :             )
     100            0 :             .await
     101            0 :             .context("setting synchronous_standby_names")?;
     102            0 :         client
     103            0 :             .query("SELECT pg_catalog.pg_reload_conf()", &[])
     104            0 :             .await
     105            0 :             .context("reloading postgres config")?;
     106              : 
     107              :         #[cfg(feature = "testing")]
     108            0 :         fail::fail_point!("compute-promotion", |_| bail!(
     109            0 :             "compute-promotion failpoint"
     110              :         ));
     111              : 
     112            0 :         let row = client
     113            0 :             .query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
     114            0 :             .await
     115            0 :             .context("pg_promote")?;
     116            0 :         if !row.get::<usize, bool>(0) {
     117            0 :             bail!("pg_promote() failed");
     118            0 :         }
     119            0 :         let pg_promote_time_ms = now.elapsed().as_millis() as u32;
     120            0 :         let now = Instant::now();
     121              : 
     122            0 :         let row = client
     123            0 :             .query_one("SHOW transaction_read_only", &[])
     124            0 :             .await
     125            0 :             .context("getting transaction_read_only")?;
     126            0 :         if row.get::<usize, &str>(0) == "on" {
     127            0 :             bail!("replica in read only mode after promotion");
     128            0 :         }
     129              : 
     130              :         // Already checked validity in http handler
     131              :         #[allow(unused_mut)]
     132            0 :         let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
     133              :         {
     134            0 :             let mut state = self.state.lock().unwrap();
     135              : 
     136              :             // Local setup has different ports for pg process (port=) for primary and secondary.
     137              :             // Primary is stopped so we need secondary's "port" value
     138              :             #[cfg(feature = "testing")]
     139              :             {
     140            0 :                 let old_spec = &state.pspec.as_ref().unwrap().spec;
     141            0 :                 let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
     142            0 :                     bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
     143              :                 };
     144            0 :                 let set: std::collections::HashMap<&str, &str> = old_conf
     145            0 :                     .split_terminator('\n')
     146            0 :                     .map(|e| e.split_once("=").expect("invalid item"))
     147            0 :                     .collect();
     148              : 
     149            0 :                 let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
     150            0 :                     bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
     151              :                 };
     152            0 :                 new_conf.push_str(&format!("port={}\n", set["port"]));
     153              :             }
     154              : 
     155            0 :             tracing::debug!("applied spec: {:#?}", new_pspec.spec);
     156            0 :             if self.params.lakebase_mode {
     157            0 :                 ComputeNode::set_spec(&self.params, &mut state, new_pspec);
     158            0 :             } else {
     159            0 :                 state.pspec = Some(new_pspec);
     160            0 :             }
     161              :         }
     162              : 
     163            0 :         info!("applied new spec, reconfiguring as primary");
     164            0 :         self.reconfigure()?;
     165            0 :         let reconfigure_time_ms = now.elapsed().as_millis() as u32;
     166              : 
     167            0 :         Ok(PromoteState::Completed {
     168            0 :             lsn_wait_time_ms,
     169            0 :             pg_promote_time_ms,
     170            0 :             reconfigure_time_ms,
     171            0 :         })
     172            0 :     }
     173              : }
        

Generated by: LCOV version 2.1-beta