Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, bail};
3 : use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
4 : use std::sync::Arc;
5 : use std::time::Instant;
6 : use tracing::info;
7 :
8 : impl ComputeNode {
9 : /// Returns only when promote fails or succeeds. If http client calling this function
10 : /// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
11 : /// Called by control plane on secondary after primary endpoint is terminated
12 : /// Has a failpoint "compute-promotion"
13 0 : pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
14 0 : let this = self.clone();
15 0 : let promote_fn = async move || match this.promote_impl(cfg).await {
16 0 : Ok(state) => state,
17 0 : Err(err) => {
18 0 : tracing::error!(%err, "promoting replica");
19 0 : let error = format!("{err:#}");
20 0 : PromoteState::Failed { error }
21 : }
22 0 : };
23 0 : let start_promotion = || {
24 0 : let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
25 0 : tokio::spawn(async move { tx.send(promote_fn().await) });
26 0 : rx
27 0 : };
28 :
29 : let mut task;
30 : // promote_impl locks self.state so we need to unlock it before calling task.changed()
31 : {
32 0 : let promote_state = &mut self.state.lock().unwrap().promote_state;
33 0 : task = promote_state.get_or_insert_with(start_promotion).clone()
34 : }
35 0 : if task.changed().await.is_err() {
36 0 : let error = "promote sender dropped".to_string();
37 0 : return PromoteState::Failed { error };
38 0 : }
39 0 : task.borrow().clone()
40 0 : }
41 :
42 0 : async fn promote_impl(self: &Arc<Self>, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
43 : #[allow(unused_mut)]
44 0 : let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
45 0 : let safekeepers_str = new_pspec.safekeeper_connstrings.join(",");
46 0 : if safekeepers_str.is_empty() {
47 0 : bail!("empty safekeepers list");
48 0 : }
49 :
50 : {
51 0 : let state = self.state.lock().unwrap();
52 0 : let mode = &state.pspec.as_ref().unwrap().spec.mode;
53 0 : if *mode != compute_api::spec::ComputeMode::Replica {
54 0 : bail!("compute mode \"{}\" is not replica", mode.to_type_str());
55 0 : }
56 0 : match &state.lfc_prewarm_state {
57 0 : status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
58 0 : bail!("compute {status}")
59 : }
60 0 : LfcPrewarmState::Failed { error } => {
61 0 : tracing::warn!(%error, "compute prewarm failed")
62 : }
63 0 : _ => {}
64 : }
65 : }
66 :
67 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
68 0 : .await
69 0 : .context("connecting to postgres")?;
70 0 : let mut now = Instant::now();
71 :
72 0 : let primary_lsn = cfg.wal_flush_lsn;
73 0 : let mut standby_lsn = utils::lsn::Lsn::INVALID;
74 : const RETRIES: i32 = 20;
75 0 : for i in 0..=RETRIES {
76 0 : let row = client
77 0 : .query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
78 0 : .await
79 0 : .context("getting last replay lsn")?;
80 0 : let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
81 0 : standby_lsn = lsn.into();
82 0 : if standby_lsn >= primary_lsn {
83 0 : break;
84 0 : }
85 0 : info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
86 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
87 : }
88 0 : if standby_lsn < primary_lsn {
89 0 : bail!("didn't catch up with primary in {RETRIES} retries");
90 0 : }
91 0 : let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
92 0 : now = Instant::now();
93 :
94 : // $1 doesn't work with ALTER SYSTEM SET
95 0 : let safekeepers_sql = format!("ALTER SYSTEM SET neon.safekeepers='{safekeepers_str}'");
96 0 : client
97 0 : .query(&safekeepers_sql, &[])
98 0 : .await
99 0 : .context("setting safekeepers")?;
100 0 : client
101 0 : .query(
102 0 : "ALTER SYSTEM SET synchronous_standby_names=walproposer",
103 0 : &[],
104 0 : )
105 0 : .await
106 0 : .context("setting synchronous_standby_names")?;
107 0 : client
108 0 : .query("SELECT pg_catalog.pg_reload_conf()", &[])
109 0 : .await
110 0 : .context("reloading postgres config")?;
111 :
112 : #[cfg(feature = "testing")]
113 0 : fail::fail_point!("compute-promotion", |_| bail!(
114 0 : "compute-promotion failpoint"
115 : ));
116 :
117 0 : let row = client
118 0 : .query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
119 0 : .await
120 0 : .context("pg_promote")?;
121 0 : if !row.get::<usize, bool>(0) {
122 0 : bail!("pg_promote() failed");
123 0 : }
124 0 : let pg_promote_time_ms = now.elapsed().as_millis() as u32;
125 0 : let now = Instant::now();
126 :
127 0 : let row = client
128 0 : .query_one("SHOW transaction_read_only", &[])
129 0 : .await
130 0 : .context("getting transaction_read_only")?;
131 0 : if row.get::<usize, &str>(0) == "on" {
132 0 : bail!("replica in read only mode after promotion");
133 0 : }
134 :
135 : // Already checked validity in http handler
136 : {
137 0 : let mut state = self.state.lock().unwrap();
138 :
139 : // Local setup has different ports for pg process (port=) for primary and secondary.
140 : // Primary is stopped so we need secondary's "port" value
141 : #[cfg(feature = "testing")]
142 : {
143 0 : let old_spec = &state.pspec.as_ref().unwrap().spec;
144 0 : let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
145 0 : bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
146 : };
147 0 : let set: std::collections::HashMap<&str, &str> = old_conf
148 0 : .split_terminator('\n')
149 0 : .map(|e| e.split_once("=").expect("invalid item"))
150 0 : .collect();
151 :
152 0 : let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
153 0 : bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
154 : };
155 0 : new_conf.push_str(&format!("port={}\n", set["port"]));
156 : }
157 :
158 0 : tracing::debug!("applied spec: {:#?}", new_pspec.spec);
159 0 : if self.params.lakebase_mode {
160 0 : ComputeNode::set_spec(&self.params, &mut state, new_pspec);
161 0 : } else {
162 0 : state.pspec = Some(new_pspec);
163 0 : }
164 : }
165 :
166 0 : info!("applied new spec, reconfiguring as primary");
167 : // reconfigure calls apply_spec_sql which blocks on a current runtime. To avoid panicking
168 : // due to nested runtimes, wait on this task in a blocking way
169 0 : let this = self.clone();
170 0 : tokio::task::spawn_blocking(move || this.reconfigure()).await??;
171 0 : let reconfigure_time_ms = now.elapsed().as_millis() as u32;
172 :
173 0 : Ok(PromoteState::Completed {
174 0 : lsn_wait_time_ms,
175 0 : pg_promote_time_ms,
176 0 : reconfigure_time_ms,
177 0 : })
178 0 : }
179 : }
|