LCOV - code coverage report
Current view: top level - compute_tools/src - compute_promote.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 87 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, Result, bail};
       3              : use compute_api::{
       4              :     responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
       5              :     spec::ComputeMode,
       6              : };
       7              : use std::{sync::Arc, time::Duration};
       8              : use tokio::time::sleep;
       9              : use utils::lsn::Lsn;
      10              : 
      11              : impl ComputeNode {
      12              :     /// Returns only when promote fails or succeeds. If a network error occurs
      13              :     /// and http client disconnects, this does not stop promotion, and subsequent
      14              :     /// calls block until promote finishes.
      15              :     /// Called by control plane on secondary after primary endpoint is terminated
      16            0 :     pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
      17            0 :         let cloned = self.clone();
      18            0 :         let start_promotion = || {
      19            0 :             let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
      20            0 :             tokio::spawn(async move {
      21            0 :                 tx.send(match cloned.promote_impl(safekeepers_lsn).await {
      22            0 :                     Ok(_) => PromoteState::Completed,
      23            0 :                     Err(err) => {
      24            0 :                         tracing::error!(%err, "promoting");
      25            0 :                         PromoteState::Failed {
      26            0 :                             error: err.to_string(),
      27            0 :                         }
      28              :                     }
      29              :                 })
      30            0 :             });
      31            0 :             rx
      32            0 :         };
      33              : 
      34              :         let mut task;
      35              :         // self.state is unlocked after block ends so we lock it in promote_impl
      36              :         // and task.changed() is reached
      37              :         {
      38            0 :             task = self
      39            0 :                 .state
      40            0 :                 .lock()
      41            0 :                 .unwrap()
      42            0 :                 .promote_state
      43            0 :                 .get_or_insert_with(start_promotion)
      44            0 :                 .clone()
      45              :         }
      46            0 :         task.changed().await.expect("promote sender dropped");
      47            0 :         task.borrow().clone()
      48            0 :     }
      49              : 
      50              :     // Why do we have to supply safekeepers?
      51              :     // For secondary we use primary_connection_conninfo so safekeepers field is empty
      52            0 :     async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
      53              :         {
      54            0 :             let state = self.state.lock().unwrap();
      55            0 :             let mode = &state.pspec.as_ref().unwrap().spec.mode;
      56            0 :             if *mode != ComputeMode::Replica {
      57            0 :                 bail!("{} is not replica", mode.to_type_str());
      58            0 :             }
      59              : 
      60              :             // we don't need to query Postgres so not self.lfc_prewarm_state()
      61            0 :             match &state.lfc_prewarm_state {
      62              :                 LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
      63            0 :                     bail!("prewarm not requested or pending")
      64              :                 }
      65            0 :                 LfcPrewarmState::Failed { error } => {
      66            0 :                     tracing::warn!(%error, "replica prewarm failed")
      67              :                 }
      68            0 :                 _ => {}
      69              :             }
      70              :         }
      71              : 
      72            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
      73            0 :             .await
      74            0 :             .context("connecting to postgres")?;
      75              : 
      76            0 :         let primary_lsn = safekeepers_lsn.wal_flush_lsn;
      77            0 :         let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
      78              :         const RETRIES: i32 = 20;
      79            0 :         for i in 0..=RETRIES {
      80            0 :             let row = client
      81            0 :                 .query_one("SELECT pg_last_wal_replay_lsn()", &[])
      82            0 :                 .await
      83            0 :                 .context("getting last replay lsn")?;
      84            0 :             let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
      85            0 :             last_wal_replay_lsn = lsn.into();
      86            0 :             if last_wal_replay_lsn >= primary_lsn {
      87            0 :                 break;
      88            0 :             }
      89            0 :             tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
      90            0 :             sleep(Duration::from_secs(1)).await;
      91              :         }
      92            0 :         if last_wal_replay_lsn < primary_lsn {
      93            0 :             bail!("didn't catch up with primary in {RETRIES} retries");
      94            0 :         }
      95              : 
      96              :         // using $1 doesn't work with ALTER SYSTEM SET
      97            0 :         let safekeepers_sql = format!(
      98            0 :             "ALTER SYSTEM SET neon.safekeepers='{}'",
      99              :             safekeepers_lsn.safekeepers
     100              :         );
     101            0 :         client
     102            0 :             .query(&safekeepers_sql, &[])
     103            0 :             .await
     104            0 :             .context("setting safekeepers")?;
     105            0 :         client
     106            0 :             .query("SELECT pg_reload_conf()", &[])
     107            0 :             .await
     108            0 :             .context("reloading postgres config")?;
     109            0 :         let row = client
     110            0 :             .query_one("SELECT * FROM pg_promote()", &[])
     111            0 :             .await
     112            0 :             .context("pg_promote")?;
     113            0 :         if !row.get::<usize, bool>(0) {
     114            0 :             bail!("pg_promote() returned false");
     115            0 :         }
     116              : 
     117            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     118            0 :             .await
     119            0 :             .context("connecting to postgres")?;
     120            0 :         let row = client
     121            0 :             .query_one("SHOW transaction_read_only", &[])
     122            0 :             .await
     123            0 :             .context("getting transaction_read_only")?;
     124            0 :         if row.get::<usize, &str>(0) == "on" {
     125            0 :             bail!("replica in read only mode after promotion");
     126            0 :         }
     127              : 
     128            0 :         let mut state = self.state.lock().unwrap();
     129            0 :         state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
     130            0 :         Ok(())
     131            0 :     }
     132              : }
        

Generated by: LCOV version 2.1-beta