Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 :
4 : use anyhow::{Result, anyhow, bail};
5 : use compute_api::responses::{
6 : ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
7 : };
8 : use reqwest::StatusCode;
9 : use tokio_postgres::Client;
10 : use tracing::{error, info, instrument};
11 :
12 : use crate::compute::ComputeNodeParams;
13 : use crate::config;
14 : use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
15 : use crate::migration::MigrationRunner;
16 : use crate::params::PG_HBA_ALL_MD5;
17 :
18 : // Do control plane request and return response if any. In case of error it
19 : // returns a bool flag indicating whether it makes sense to retry the request
20 : // and a string with error message.
21 0 : fn do_control_plane_request(
22 0 : uri: &str,
23 0 : jwt: &str,
24 0 : ) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
25 0 : let resp = reqwest::blocking::Client::new()
26 0 : .get(uri)
27 0 : .header("Authorization", format!("Bearer {jwt}"))
28 0 : .send()
29 0 : .map_err(|e| {
30 0 : (
31 0 : true,
32 0 : format!("could not perform request to control plane: {e:?}"),
33 0 : UNKNOWN_HTTP_STATUS.to_string(),
34 0 : )
35 0 : })?;
36 :
37 0 : let status = resp.status();
38 0 : match status {
39 0 : StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
40 0 : Ok(spec_resp) => Ok(spec_resp),
41 0 : Err(e) => Err((
42 0 : true,
43 0 : format!("could not deserialize control plane response: {e:?}"),
44 0 : status.to_string(),
45 0 : )),
46 : },
47 0 : StatusCode::SERVICE_UNAVAILABLE => Err((
48 0 : true,
49 0 : "control plane is temporarily unavailable".to_string(),
50 0 : status.to_string(),
51 0 : )),
52 : StatusCode::BAD_GATEWAY => {
53 : // We have a problem with intermittent 502 errors now
54 : // https://github.com/neondatabase/cloud/issues/2353
55 : // It's fine to retry GET request in this case.
56 0 : Err((
57 0 : true,
58 0 : "control plane request failed with 502".to_string(),
59 0 : status.to_string(),
60 0 : ))
61 : }
62 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
63 : // or some internal failure happened. Doesn't make much sense to retry in this case.
64 0 : _ => Err((
65 0 : false,
66 0 : format!("unexpected control plane response status code: {status}"),
67 0 : status.to_string(),
68 0 : )),
69 : }
70 0 : }
71 :
72 : /// Request config from the control-plane by compute_id. If
73 : /// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
74 : /// authorization.
75 0 : pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
76 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
77 0 : let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
78 0 : let mut attempt = 1;
79 :
80 0 : info!("getting config from control plane: {}", cp_uri);
81 :
82 : // Do 3 attempts to get spec from the control plane using the following logic:
83 : // - network error -> then retry
84 : // - compute id is unknown or any other error -> bail out
85 : // - no spec for compute yet (Empty state) -> return Ok(None)
86 : // - got config -> return Ok(Some(config))
87 0 : while attempt < 4 {
88 0 : let result = match do_control_plane_request(&cp_uri, &jwt) {
89 0 : Ok(config_resp) => {
90 0 : CPLANE_REQUESTS_TOTAL
91 0 : .with_label_values(&[
92 0 : CPlaneRequestRPC::GetConfig.as_str(),
93 0 : &StatusCode::OK.to_string(),
94 0 : ])
95 0 : .inc();
96 0 : match config_resp.status {
97 0 : ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
98 : ControlPlaneComputeStatus::Attached => {
99 0 : if config_resp.spec.is_some() {
100 0 : Ok(config_resp.into())
101 : } else {
102 0 : bail!("compute is attached, but spec is empty")
103 : }
104 : }
105 : }
106 : }
107 0 : Err((retry, msg, status)) => {
108 0 : CPLANE_REQUESTS_TOTAL
109 0 : .with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
110 0 : .inc();
111 0 : if retry {
112 0 : Err(anyhow!(msg))
113 : } else {
114 0 : bail!(msg);
115 : }
116 : }
117 : };
118 :
119 0 : if let Err(e) = &result {
120 0 : error!("attempt {} to get config failed with: {}", attempt, e);
121 : } else {
122 0 : return result;
123 : }
124 :
125 0 : attempt += 1;
126 0 : std::thread::sleep(std::time::Duration::from_millis(100));
127 : }
128 :
129 : // All attempts failed, return error.
130 0 : Err(anyhow::anyhow!(
131 0 : "Exhausted all attempts to retrieve the config from the control plane"
132 0 : ))
133 0 : }
134 :
135 : /// Check `pg_hba.conf` and update if needed to allow external connections.
136 0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
137 : // XXX: consider making it a part of config.json
138 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
139 :
140 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
141 0 : info!("updated pg_hba.conf to allow external connections");
142 : } else {
143 0 : info!("pg_hba.conf is up-to-date");
144 : }
145 :
146 0 : Ok(())
147 0 : }
148 :
149 : /// Create a standby.signal file
150 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
151 : // XXX: consider making it a part of config.json
152 0 : let signalfile = pgdata_path.join("standby.signal");
153 :
154 0 : if !signalfile.exists() {
155 0 : File::create(signalfile)?;
156 0 : info!("created standby.signal");
157 : } else {
158 0 : info!("reused pre-existing standby.signal");
159 : }
160 0 : Ok(())
161 0 : }
162 :
163 : #[instrument(skip_all)]
164 : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
165 : let query = "ALTER EXTENSION neon UPDATE";
166 : info!("update neon extension version with query: {}", query);
167 : client.simple_query(query).await?;
168 :
169 : Ok(())
170 : }
171 :
172 : #[instrument(skip_all)]
173 : pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
174 : info!("handle migrations");
175 :
176 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
177 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
178 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
179 :
180 : // Add new migrations in numerical order.
181 : let migrations = [
182 : &format!(
183 : include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
184 : privileged_role_name = params.privileged_role_name
185 : ),
186 : &format!(
187 : include_str!("./migrations/0002-alter_roles.sql"),
188 : privileged_role_name = params.privileged_role_name
189 : ),
190 : &format!(
191 : include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
192 : privileged_role_name = params.privileged_role_name
193 : ),
194 : &format!(
195 : include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
196 : privileged_role_name = params.privileged_role_name
197 : ),
198 : &format!(
199 : include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
200 : privileged_role_name = params.privileged_role_name
201 : ),
202 : &format!(
203 : include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
204 : privileged_role_name = params.privileged_role_name
205 : ),
206 : &format!(
207 : include_str!(
208 : "./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
209 : ),
210 : privileged_role_name = params.privileged_role_name
211 : ),
212 : &format!(
213 : include_str!(
214 : "./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
215 : ),
216 : privileged_role_name = params.privileged_role_name
217 : ),
218 : include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
219 : &format!(
220 : include_str!(
221 : "./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
222 : ),
223 : privileged_role_name = params.privileged_role_name
224 : ),
225 : &format!(
226 : include_str!(
227 : "./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
228 : ),
229 : privileged_role_name = params.privileged_role_name
230 : ),
231 : &format!(
232 : include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
233 : privileged_role_name = params.privileged_role_name
234 : ),
235 : ];
236 :
237 : MigrationRunner::new(client, &migrations)
238 : .run_migrations()
239 : .await?;
240 :
241 : Ok(())
242 : }
|