Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, bail};
3 : use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
4 : use std::time::Instant;
5 : use tracing::info;
6 :
7 : impl ComputeNode {
8 : /// Returns only when promote fails or succeeds. If http client calling this function
9 : /// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
10 : /// Called by control plane on secondary after primary endpoint is terminated
11 : /// Has a failpoint "compute-promotion"
12 0 : pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
13 0 : let this = self.clone();
14 0 : let promote_fn = async move || match this.promote_impl(cfg).await {
15 0 : Ok(state) => state,
16 0 : Err(err) => {
17 0 : tracing::error!(%err, "promoting replica");
18 0 : let error = format!("{err:#}");
19 0 : PromoteState::Failed { error }
20 : }
21 0 : };
22 0 : let start_promotion = || {
23 0 : let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
24 0 : tokio::spawn(async move { tx.send(promote_fn().await) });
25 0 : rx
26 0 : };
27 :
28 : let mut task;
29 : // promote_impl locks self.state so we need to unlock it before calling task.changed()
30 : {
31 0 : let promote_state = &mut self.state.lock().unwrap().promote_state;
32 0 : task = promote_state.get_or_insert_with(start_promotion).clone()
33 : }
34 0 : if task.changed().await.is_err() {
35 0 : let error = "promote sender dropped".to_string();
36 0 : return PromoteState::Failed { error };
37 0 : }
38 0 : task.borrow().clone()
39 0 : }
40 :
41 0 : async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
42 : {
43 0 : let state = self.state.lock().unwrap();
44 0 : let mode = &state.pspec.as_ref().unwrap().spec.mode;
45 0 : if *mode != compute_api::spec::ComputeMode::Replica {
46 0 : bail!("compute mode \"{}\" is not replica", mode.to_type_str());
47 0 : }
48 0 : match &state.lfc_prewarm_state {
49 0 : status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
50 0 : bail!("compute {status}")
51 : }
52 0 : LfcPrewarmState::Failed { error } => {
53 0 : tracing::warn!(%error, "compute prewarm failed")
54 : }
55 0 : _ => {}
56 : }
57 : }
58 :
59 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
60 0 : .await
61 0 : .context("connecting to postgres")?;
62 0 : let mut now = Instant::now();
63 :
64 0 : let primary_lsn = cfg.wal_flush_lsn;
65 0 : let mut standby_lsn = utils::lsn::Lsn::INVALID;
66 : const RETRIES: i32 = 20;
67 0 : for i in 0..=RETRIES {
68 0 : let row = client
69 0 : .query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
70 0 : .await
71 0 : .context("getting last replay lsn")?;
72 0 : let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
73 0 : standby_lsn = lsn.into();
74 0 : if standby_lsn >= primary_lsn {
75 0 : break;
76 0 : }
77 0 : info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
78 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
79 : }
80 0 : if standby_lsn < primary_lsn {
81 0 : bail!("didn't catch up with primary in {RETRIES} retries");
82 0 : }
83 0 : let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
84 0 : now = Instant::now();
85 :
86 : // using $1 doesn't work with ALTER SYSTEM SET
87 0 : let safekeepers_sql = format!(
88 0 : "ALTER SYSTEM SET neon.safekeepers='{}'",
89 0 : cfg.spec.safekeeper_connstrings.join(",")
90 : );
91 0 : client
92 0 : .query(&safekeepers_sql, &[])
93 0 : .await
94 0 : .context("setting safekeepers")?;
95 0 : client
96 0 : .query(
97 0 : "ALTER SYSTEM SET synchronous_standby_names=walproposer",
98 0 : &[],
99 0 : )
100 0 : .await
101 0 : .context("setting synchronous_standby_names")?;
102 0 : client
103 0 : .query("SELECT pg_catalog.pg_reload_conf()", &[])
104 0 : .await
105 0 : .context("reloading postgres config")?;
106 :
107 : #[cfg(feature = "testing")]
108 0 : fail::fail_point!("compute-promotion", |_| bail!(
109 0 : "compute-promotion failpoint"
110 : ));
111 :
112 0 : let row = client
113 0 : .query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
114 0 : .await
115 0 : .context("pg_promote")?;
116 0 : if !row.get::<usize, bool>(0) {
117 0 : bail!("pg_promote() failed");
118 0 : }
119 0 : let pg_promote_time_ms = now.elapsed().as_millis() as u32;
120 0 : let now = Instant::now();
121 :
122 0 : let row = client
123 0 : .query_one("SHOW transaction_read_only", &[])
124 0 : .await
125 0 : .context("getting transaction_read_only")?;
126 0 : if row.get::<usize, &str>(0) == "on" {
127 0 : bail!("replica in read only mode after promotion");
128 0 : }
129 :
130 : // Already checked validity in http handler
131 : #[allow(unused_mut)]
132 0 : let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
133 : {
134 0 : let mut state = self.state.lock().unwrap();
135 :
136 : // Local setup has different ports for pg process (port=) for primary and secondary.
137 : // Primary is stopped so we need secondary's "port" value
138 : #[cfg(feature = "testing")]
139 : {
140 0 : let old_spec = &state.pspec.as_ref().unwrap().spec;
141 0 : let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
142 0 : bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
143 : };
144 0 : let set: std::collections::HashMap<&str, &str> = old_conf
145 0 : .split_terminator('\n')
146 0 : .map(|e| e.split_once("=").expect("invalid item"))
147 0 : .collect();
148 :
149 0 : let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
150 0 : bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
151 : };
152 0 : new_conf.push_str(&format!("port={}\n", set["port"]));
153 : }
154 :
155 0 : tracing::debug!("applied spec: {:#?}", new_pspec.spec);
156 0 : if self.params.lakebase_mode {
157 0 : ComputeNode::set_spec(&self.params, &mut state, new_pspec);
158 0 : } else {
159 0 : state.pspec = Some(new_pspec);
160 0 : }
161 : }
162 :
163 0 : info!("applied new spec, reconfiguring as primary");
164 0 : self.reconfigure()?;
165 0 : let reconfigure_time_ms = now.elapsed().as_millis() as u32;
166 :
167 0 : Ok(PromoteState::Completed {
168 0 : lsn_wait_time_ms,
169 0 : pg_promote_time_ms,
170 0 : reconfigure_time_ms,
171 0 : })
172 0 : }
173 : }
|