Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use compute_api::{
4 : responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
5 : spec::ComputeMode,
6 : };
7 : use std::{sync::Arc, time::Duration};
8 : use tokio::time::sleep;
9 : use utils::lsn::Lsn;
10 :
11 : impl ComputeNode {
12 : /// Returns only when promote fails or succeeds. If a network error occurs
13 : /// and http client disconnects, this does not stop promotion, and subsequent
14 : /// calls block until promote finishes.
15 : /// Called by control plane on secondary after primary endpoint is terminated
16 0 : pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
17 0 : let cloned = self.clone();
18 0 : let start_promotion = || {
19 0 : let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
20 0 : tokio::spawn(async move {
21 0 : tx.send(match cloned.promote_impl(safekeepers_lsn).await {
22 0 : Ok(_) => PromoteState::Completed,
23 0 : Err(err) => {
24 0 : tracing::error!(%err, "promoting");
25 0 : PromoteState::Failed {
26 0 : error: err.to_string(),
27 0 : }
28 : }
29 : })
30 0 : });
31 0 : rx
32 0 : };
33 :
34 : let mut task;
35 : // self.state is unlocked after block ends so we lock it in promote_impl
36 : // and task.changed() is reached
37 : {
38 0 : task = self
39 0 : .state
40 0 : .lock()
41 0 : .unwrap()
42 0 : .promote_state
43 0 : .get_or_insert_with(start_promotion)
44 0 : .clone()
45 : }
46 0 : task.changed().await.expect("promote sender dropped");
47 0 : task.borrow().clone()
48 0 : }
49 :
50 : // Why do we have to supply safekeepers?
51 : // For secondary we use primary_connection_conninfo so safekeepers field is empty
52 0 : async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
53 : {
54 0 : let state = self.state.lock().unwrap();
55 0 : let mode = &state.pspec.as_ref().unwrap().spec.mode;
56 0 : if *mode != ComputeMode::Replica {
57 0 : bail!("{} is not replica", mode.to_type_str());
58 0 : }
59 :
60 : // we don't need to query Postgres so not self.lfc_prewarm_state()
61 0 : match &state.lfc_prewarm_state {
62 : LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
63 0 : bail!("prewarm not requested or pending")
64 : }
65 0 : LfcPrewarmState::Failed { error } => {
66 0 : tracing::warn!(%error, "replica prewarm failed")
67 : }
68 0 : _ => {}
69 : }
70 : }
71 :
72 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
73 0 : .await
74 0 : .context("connecting to postgres")?;
75 :
76 0 : let primary_lsn = safekeepers_lsn.wal_flush_lsn;
77 0 : let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
78 : const RETRIES: i32 = 20;
79 0 : for i in 0..=RETRIES {
80 0 : let row = client
81 0 : .query_one("SELECT pg_last_wal_replay_lsn()", &[])
82 0 : .await
83 0 : .context("getting last replay lsn")?;
84 0 : let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
85 0 : last_wal_replay_lsn = lsn.into();
86 0 : if last_wal_replay_lsn >= primary_lsn {
87 0 : break;
88 0 : }
89 0 : tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
90 0 : sleep(Duration::from_secs(1)).await;
91 : }
92 0 : if last_wal_replay_lsn < primary_lsn {
93 0 : bail!("didn't catch up with primary in {RETRIES} retries");
94 0 : }
95 :
96 : // using $1 doesn't work with ALTER SYSTEM SET
97 0 : let safekeepers_sql = format!(
98 0 : "ALTER SYSTEM SET neon.safekeepers='{}'",
99 : safekeepers_lsn.safekeepers
100 : );
101 0 : client
102 0 : .query(&safekeepers_sql, &[])
103 0 : .await
104 0 : .context("setting safekeepers")?;
105 0 : client
106 0 : .query("SELECT pg_reload_conf()", &[])
107 0 : .await
108 0 : .context("reloading postgres config")?;
109 0 : let row = client
110 0 : .query_one("SELECT * FROM pg_promote()", &[])
111 0 : .await
112 0 : .context("pg_promote")?;
113 0 : if !row.get::<usize, bool>(0) {
114 0 : bail!("pg_promote() returned false");
115 0 : }
116 :
117 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
118 0 : .await
119 0 : .context("connecting to postgres")?;
120 0 : let row = client
121 0 : .query_one("SHOW transaction_read_only", &[])
122 0 : .await
123 0 : .context("getting transaction_read_only")?;
124 0 : if row.get::<usize, &str>(0) == "on" {
125 0 : bail!("replica in read only mode after promotion");
126 0 : }
127 :
128 0 : let mut state = self.state.lock().unwrap();
129 0 : state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
130 0 : Ok(())
131 0 : }
132 : }
|