Line data Source code
1 : use std::fs::File;
2 : use std::fs::{self, Permissions};
3 : use std::os::unix::fs::PermissionsExt;
4 : use std::path::Path;
5 :
6 : use anyhow::{Result, anyhow, bail};
7 : use compute_api::responses::{
8 : ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
9 : };
10 : use reqwest::StatusCode;
11 : use tokio_postgres::Client;
12 : use tracing::{error, info, instrument};
13 :
14 : use crate::compute::ComputeNodeParams;
15 : use crate::config;
16 : use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
17 : use crate::migration::MigrationRunner;
18 : use crate::params::PG_HBA_ALL_MD5;
19 :
20 : // Do control plane request and return response if any. In case of error it
21 : // returns a bool flag indicating whether it makes sense to retry the request
22 : // and a string with error message.
23 0 : fn do_control_plane_request(
24 0 : uri: &str,
25 0 : jwt: &str,
26 0 : ) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
27 0 : let resp = reqwest::blocking::Client::new()
28 0 : .get(uri)
29 0 : .header("Authorization", format!("Bearer {jwt}"))
30 0 : .send()
31 0 : .map_err(|e| {
32 0 : (
33 0 : true,
34 0 : format!("could not perform request to control plane: {e:?}"),
35 0 : UNKNOWN_HTTP_STATUS.to_string(),
36 0 : )
37 0 : })?;
38 :
39 0 : let status = resp.status();
40 0 : match status {
41 0 : StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
42 0 : Ok(spec_resp) => Ok(spec_resp),
43 0 : Err(e) => Err((
44 0 : true,
45 0 : format!("could not deserialize control plane response: {e:?}"),
46 0 : status.to_string(),
47 0 : )),
48 : },
49 0 : StatusCode::SERVICE_UNAVAILABLE => Err((
50 0 : true,
51 0 : "control plane is temporarily unavailable".to_string(),
52 0 : status.to_string(),
53 0 : )),
54 : StatusCode::BAD_GATEWAY => {
55 : // We have a problem with intermittent 502 errors now
56 : // https://github.com/neondatabase/cloud/issues/2353
57 : // It's fine to retry GET request in this case.
58 0 : Err((
59 0 : true,
60 0 : "control plane request failed with 502".to_string(),
61 0 : status.to_string(),
62 0 : ))
63 : }
64 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
65 : // or some internal failure happened. Doesn't make much sense to retry in this case.
66 0 : _ => Err((
67 0 : false,
68 0 : format!("unexpected control plane response status code: {status}"),
69 0 : status.to_string(),
70 0 : )),
71 : }
72 0 : }
73 :
74 : /// Request config from the control-plane by compute_id. If
75 : /// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
76 : /// authorization.
77 0 : pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
78 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
79 0 : let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
80 0 : let mut attempt = 1;
81 :
82 0 : info!("getting config from control plane: {}", cp_uri);
83 :
84 : // Do 3 attempts to get spec from the control plane using the following logic:
85 : // - network error -> then retry
86 : // - compute id is unknown or any other error -> bail out
87 : // - no spec for compute yet (Empty state) -> return Ok(None)
88 : // - got config -> return Ok(Some(config))
89 0 : while attempt < 4 {
90 0 : let result = match do_control_plane_request(&cp_uri, &jwt) {
91 0 : Ok(config_resp) => {
92 0 : CPLANE_REQUESTS_TOTAL
93 0 : .with_label_values(&[
94 0 : CPlaneRequestRPC::GetConfig.as_str(),
95 0 : &StatusCode::OK.to_string(),
96 0 : ])
97 0 : .inc();
98 0 : match config_resp.status {
99 0 : ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
100 : ControlPlaneComputeStatus::Attached => {
101 0 : if config_resp.spec.is_some() {
102 0 : Ok(config_resp.into())
103 : } else {
104 0 : bail!("compute is attached, but spec is empty")
105 : }
106 : }
107 : }
108 : }
109 0 : Err((retry, msg, status)) => {
110 0 : CPLANE_REQUESTS_TOTAL
111 0 : .with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
112 0 : .inc();
113 0 : if retry {
114 0 : Err(anyhow!(msg))
115 : } else {
116 0 : bail!(msg);
117 : }
118 : }
119 : };
120 :
121 0 : if let Err(e) = &result {
122 0 : error!("attempt {} to get config failed with: {}", attempt, e);
123 : } else {
124 0 : return result;
125 : }
126 :
127 0 : attempt += 1;
128 0 : std::thread::sleep(std::time::Duration::from_millis(100));
129 : }
130 :
131 : // All attempts failed, return error.
132 0 : Err(anyhow::anyhow!(
133 0 : "Exhausted all attempts to retrieve the config from the control plane"
134 0 : ))
135 0 : }
136 :
137 : /// Check `pg_hba.conf` and update if needed to allow external connections.
138 0 : pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
139 : // XXX: consider making it a part of config.json
140 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
141 :
142 : // Update pg_hba to contains databricks specfic settings before adding neon settings
143 : // PG uses the first record that matches to perform authentication, so we need to have
144 : // our rules before the default ones from neon.
145 : // See https://www.postgresql.org/docs/current/auth-pg-hba-conf.html
146 0 : if let Some(databricks_pg_hba) = databricks_pg_hba {
147 0 : if config::line_in_file(
148 0 : &pghba_path,
149 0 : &format!("include_if_exists {}\n", *databricks_pg_hba),
150 0 : )? {
151 0 : info!("updated pg_hba.conf to include databricks_pg_hba.conf");
152 : } else {
153 0 : info!("pg_hba.conf already included databricks_pg_hba.conf");
154 : }
155 0 : }
156 :
157 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
158 0 : info!("updated pg_hba.conf to allow external connections");
159 : } else {
160 0 : info!("pg_hba.conf is up-to-date");
161 : }
162 :
163 0 : Ok(())
164 0 : }
165 :
166 : /// Check `pg_ident.conf` and update if needed to allow databricks config.
167 0 : pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
168 0 : info!("checking pg_ident.conf");
169 0 : let pghba_path = pgdata_path.join("pg_ident.conf");
170 :
171 : // Update pg_ident to contains databricks specfic settings
172 0 : if let Some(databricks_pg_ident) = databricks_pg_ident {
173 0 : if config::line_in_file(
174 0 : &pghba_path,
175 0 : &format!("include_if_exists {}\n", *databricks_pg_ident),
176 0 : )? {
177 0 : info!("updated pg_ident.conf to include databricks_pg_ident.conf");
178 : } else {
179 0 : info!("pg_ident.conf already included databricks_pg_ident.conf");
180 : }
181 0 : }
182 :
183 0 : Ok(())
184 0 : }
185 :
186 : /// Copy tls key_file and cert_file from k8s secret mount directory
187 : /// to pgdata and set private key file permissions as expected by Postgres.
188 : /// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
189 : /// K8s secrets mount on dblet does not honor permission and ownership
190 : /// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
191 0 : pub fn copy_tls_certificates(
192 0 : key_file: &String,
193 0 : cert_file: &String,
194 0 : pgdata_path: &Path,
195 0 : ) -> Result<()> {
196 0 : let files = [cert_file, key_file];
197 0 : for file in files.iter() {
198 0 : let source = Path::new(file);
199 0 : let dest = pgdata_path.join(source.file_name().unwrap());
200 0 : if !dest.exists() {
201 0 : std::fs::copy(source, &dest)?;
202 0 : info!(
203 0 : "Copying tls file: {} to {}",
204 0 : &source.display(),
205 0 : &dest.display()
206 : );
207 0 : }
208 0 : if *file == key_file {
209 : // Postgres requires private key to be readable only by the owner by having
210 : // chmod 600 permissions.
211 0 : let permissions = Permissions::from_mode(0o600);
212 0 : fs::set_permissions(&dest, permissions)?;
213 0 : info!("Setting permission on {}.", &dest.display());
214 0 : }
215 : }
216 0 : Ok(())
217 0 : }
218 :
219 : /// Create a standby.signal file
220 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
221 : // XXX: consider making it a part of config.json
222 0 : let signalfile = pgdata_path.join("standby.signal");
223 :
224 0 : if !signalfile.exists() {
225 0 : File::create(signalfile)?;
226 0 : info!("created standby.signal");
227 : } else {
228 0 : info!("reused pre-existing standby.signal");
229 : }
230 0 : Ok(())
231 0 : }
232 :
233 : #[instrument(skip_all)]
234 : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
235 : let query = "ALTER EXTENSION neon UPDATE";
236 : info!("update neon extension version with query: {}", query);
237 : client.simple_query(query).await?;
238 :
239 : Ok(())
240 : }
241 :
242 : #[instrument(skip_all)]
243 : pub async fn handle_migrations(
244 : params: ComputeNodeParams,
245 : client: &mut Client,
246 : lakebase_mode: bool,
247 : ) -> Result<()> {
248 : info!("handle migrations");
249 :
250 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
251 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
252 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
253 :
254 : // Add new migrations in numerical order.
255 : let migrations = [
256 : &format!(
257 : include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
258 : privileged_role_name = params.privileged_role_name
259 : ),
260 : &format!(
261 : include_str!("./migrations/0002-alter_roles.sql"),
262 : privileged_role_name = params.privileged_role_name
263 : ),
264 : &format!(
265 : include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
266 : privileged_role_name = params.privileged_role_name
267 : ),
268 : &format!(
269 : include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
270 : privileged_role_name = params.privileged_role_name
271 : ),
272 : &format!(
273 : include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
274 : privileged_role_name = params.privileged_role_name
275 : ),
276 : &format!(
277 : include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
278 : privileged_role_name = params.privileged_role_name
279 : ),
280 : &format!(
281 : include_str!(
282 : "./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
283 : ),
284 : privileged_role_name = params.privileged_role_name
285 : ),
286 : &format!(
287 : include_str!(
288 : "./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
289 : ),
290 : privileged_role_name = params.privileged_role_name
291 : ),
292 : include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
293 : &format!(
294 : include_str!(
295 : "./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
296 : ),
297 : privileged_role_name = params.privileged_role_name
298 : ),
299 : &format!(
300 : include_str!(
301 : "./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
302 : ),
303 : privileged_role_name = params.privileged_role_name
304 : ),
305 : &format!(
306 : include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
307 : privileged_role_name = params.privileged_role_name
308 : ),
309 : ];
310 :
311 : MigrationRunner::new(client, &migrations, lakebase_mode)
312 : .run_migrations()
313 : .await?;
314 :
315 : Ok(())
316 : }
|