LCOV - code coverage report
Current view: top level - compute_tools/src - compute_promote.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 114 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, Result, bail};
       3              : use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
       4              : use compute_api::spec::ComputeMode;
       5              : use itertools::Itertools;
       6              : use std::collections::HashMap;
       7              : use std::{sync::Arc, time::Duration};
       8              : use tokio::time::sleep;
       9              : use tracing::info;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : impl ComputeNode {
      13              :     /// Returns only when promote fails or succeeds. If a network error occurs
      14              :     /// and http client disconnects, this does not stop promotion, and subsequent
      15              :     /// calls block until promote finishes.
      16              :     /// Called by control plane on secondary after primary endpoint is terminated
      17              :     /// Has a failpoint "compute-promotion"
      18            0 :     pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
      19            0 :         let cloned = self.clone();
      20            0 :         let promote_fn = async move || {
      21            0 :             let Err(err) = cloned.promote_impl(cfg).await else {
      22            0 :                 return PromoteState::Completed;
      23              :             };
      24            0 :             tracing::error!(%err, "promoting");
      25            0 :             PromoteState::Failed {
      26            0 :                 error: format!("{err:#}"),
      27            0 :             }
      28            0 :         };
      29              : 
      30            0 :         let start_promotion = || {
      31            0 :             let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
      32            0 :             tokio::spawn(async move { tx.send(promote_fn().await) });
      33            0 :             rx
      34            0 :         };
      35              : 
      36              :         let mut task;
      37              :         // self.state is unlocked after block ends so we lock it in promote_impl
      38              :         // and task.changed() is reached
      39              :         {
      40            0 :             task = self
      41            0 :                 .state
      42            0 :                 .lock()
      43            0 :                 .unwrap()
      44            0 :                 .promote_state
      45            0 :                 .get_or_insert_with(start_promotion)
      46            0 :                 .clone()
      47              :         }
      48            0 :         task.changed().await.expect("promote sender dropped");
      49            0 :         task.borrow().clone()
      50            0 :     }
      51              : 
      52            0 :     async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
      53              :         {
      54            0 :             let state = self.state.lock().unwrap();
      55            0 :             let mode = &state.pspec.as_ref().unwrap().spec.mode;
      56            0 :             if *mode != ComputeMode::Replica {
      57            0 :                 bail!("{} is not replica", mode.to_type_str());
      58            0 :             }
      59              : 
      60              :             // we don't need to query Postgres so not self.lfc_prewarm_state()
      61            0 :             match &state.lfc_prewarm_state {
      62              :                 LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
      63            0 :                     bail!("prewarm not requested or pending")
      64              :                 }
      65            0 :                 LfcPrewarmState::Failed { error } => {
      66            0 :                     tracing::warn!(%error, "replica prewarm failed")
      67              :                 }
      68            0 :                 _ => {}
      69              :             }
      70              :         }
      71              : 
      72            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
      73            0 :             .await
      74            0 :             .context("connecting to postgres")?;
      75              : 
      76            0 :         let primary_lsn = cfg.wal_flush_lsn;
      77            0 :         let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
      78              :         const RETRIES: i32 = 20;
      79            0 :         for i in 0..=RETRIES {
      80            0 :             let row = client
      81            0 :                 .query_one("SELECT pg_last_wal_replay_lsn()", &[])
      82            0 :                 .await
      83            0 :                 .context("getting last replay lsn")?;
      84            0 :             let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
      85            0 :             last_wal_replay_lsn = lsn.into();
      86            0 :             if last_wal_replay_lsn >= primary_lsn {
      87            0 :                 break;
      88            0 :             }
      89            0 :             info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
      90            0 :             sleep(Duration::from_secs(1)).await;
      91              :         }
      92            0 :         if last_wal_replay_lsn < primary_lsn {
      93            0 :             bail!("didn't catch up with primary in {RETRIES} retries");
      94            0 :         }
      95              : 
      96              :         // using $1 doesn't work with ALTER SYSTEM SET
      97            0 :         let safekeepers_sql = format!(
      98            0 :             "ALTER SYSTEM SET neon.safekeepers='{}'",
      99            0 :             cfg.spec.safekeeper_connstrings.join(",")
     100              :         );
     101            0 :         client
     102            0 :             .query(&safekeepers_sql, &[])
     103            0 :             .await
     104            0 :             .context("setting safekeepers")?;
     105            0 :         client
     106            0 :             .query("SELECT pg_reload_conf()", &[])
     107            0 :             .await
     108            0 :             .context("reloading postgres config")?;
     109              : 
     110              :         #[cfg(feature = "testing")]
     111            0 :         fail::fail_point!("compute-promotion", |_| {
     112            0 :             bail!("promotion configured to fail because of a failpoint")
     113            0 :         });
     114              : 
     115            0 :         let row = client
     116            0 :             .query_one("SELECT * FROM pg_promote()", &[])
     117            0 :             .await
     118            0 :             .context("pg_promote")?;
     119            0 :         if !row.get::<usize, bool>(0) {
     120            0 :             bail!("pg_promote() returned false");
     121            0 :         }
     122              : 
     123            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     124            0 :             .await
     125            0 :             .context("connecting to postgres")?;
     126            0 :         let row = client
     127            0 :             .query_one("SHOW transaction_read_only", &[])
     128            0 :             .await
     129            0 :             .context("getting transaction_read_only")?;
     130            0 :         if row.get::<usize, &str>(0) == "on" {
     131            0 :             bail!("replica in read only mode after promotion");
     132            0 :         }
     133              : 
     134            0 :         {
     135            0 :             let mut state = self.state.lock().unwrap();
     136            0 :             let spec = &mut state.pspec.as_mut().unwrap().spec;
     137            0 :             spec.mode = ComputeMode::Primary;
     138            0 :             let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
     139            0 :             let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
     140            0 :             Self::merge_spec(new_conf, existing_conf);
     141            0 :         }
     142            0 :         info!("applied new spec, reconfiguring as primary");
     143            0 :         self.reconfigure()
     144            0 :     }
     145              : 
     146              :     /// Merge old and new Postgres conf specs to apply on secondary.
     147              :     /// Change new spec's port and safekeepers since they are supplied
     148              :     /// differenly
     149            0 :     fn merge_spec(new_conf: &mut String, existing_conf: &str) {
     150            0 :         let mut new_conf_set: HashMap<&str, &str> = new_conf
     151            0 :             .split_terminator('\n')
     152            0 :             .map(|e| e.split_once("=").expect("invalid item"))
     153            0 :             .collect();
     154            0 :         new_conf_set.remove("neon.safekeepers");
     155              : 
     156            0 :         let existing_conf_set: HashMap<&str, &str> = existing_conf
     157            0 :             .split_terminator('\n')
     158            0 :             .map(|e| e.split_once("=").expect("invalid item"))
     159            0 :             .collect();
     160            0 :         new_conf_set.insert("port", existing_conf_set["port"]);
     161            0 :         *new_conf = new_conf_set
     162            0 :             .iter()
     163            0 :             .map(|(k, v)| format!("{k}={v}"))
     164            0 :             .join("\n");
     165            0 :     }
     166              : }
        

Generated by: LCOV version 2.1-beta