Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::params::PG_HBA_ALL_MD5;
13 : use crate::pg_helpers::*;
14 :
15 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
16 : use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
17 :
18 : // Do control plane request and return response if any. In case of error it
19 : // returns a bool flag indicating whether it makes sense to retry the request
20 : // and a string with error message.
21 0 : fn do_control_plane_request(
22 0 : uri: &str,
23 0 : jwt: &str,
24 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
25 0 : let resp = reqwest::blocking::Client::new()
26 0 : .get(uri)
27 0 : .header("Authorization", jwt)
28 0 : .send()
29 0 : .map_err(|e| {
30 0 : (
31 0 : true,
32 0 : format!("could not perform spec request to control plane: {}", e),
33 0 : )
34 0 : })?;
35 :
36 0 : match resp.status() {
37 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
38 0 : Ok(spec_resp) => Ok(spec_resp),
39 0 : Err(e) => Err((
40 0 : true,
41 0 : format!("could not deserialize control plane response: {}", e),
42 0 : )),
43 : },
44 : StatusCode::SERVICE_UNAVAILABLE => {
45 0 : Err((true, "control plane is temporarily unavailable".to_string()))
46 : }
47 : StatusCode::BAD_GATEWAY => {
48 : // We have a problem with intermittent 502 errors now
49 : // https://github.com/neondatabase/cloud/issues/2353
50 : // It's fine to retry GET request in this case.
51 0 : Err((true, "control plane request failed with 502".to_string()))
52 : }
53 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
54 : // or some internal failure happened. Doesn't make much sense to retry in this case.
55 0 : _ => Err((
56 0 : false,
57 0 : format!(
58 0 : "unexpected control plane response status code: {}",
59 0 : resp.status()
60 0 : ),
61 0 : )),
62 : }
63 0 : }
64 :
65 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
66 : /// env variable is set, it will be used for authorization.
67 0 : pub fn get_spec_from_control_plane(
68 0 : base_uri: &str,
69 0 : compute_id: &str,
70 0 : ) -> Result<Option<ComputeSpec>> {
71 0 : let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
72 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
73 0 : Ok(v) => v,
74 0 : Err(_) => "".to_string(),
75 : };
76 0 : let mut attempt = 1;
77 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
78 0 :
79 0 : info!("getting spec from control plane: {}", cp_uri);
80 :
81 : // Do 3 attempts to get spec from the control plane using the following logic:
82 : // - network error -> then retry
83 : // - compute id is unknown or any other error -> bail out
84 : // - no spec for compute yet (Empty state) -> return Ok(None)
85 : // - got spec -> return Ok(Some(spec))
86 0 : while attempt < 4 {
87 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
88 0 : Ok(spec_resp) => match spec_resp.status {
89 0 : ControlPlaneComputeStatus::Empty => Ok(None),
90 : ControlPlaneComputeStatus::Attached => {
91 0 : if let Some(spec) = spec_resp.spec {
92 0 : Ok(Some(spec))
93 : } else {
94 0 : bail!("compute is attached, but spec is empty")
95 : }
96 : }
97 : },
98 0 : Err((retry, msg)) => {
99 0 : if retry {
100 0 : Err(anyhow!(msg))
101 : } else {
102 0 : bail!(msg);
103 : }
104 : }
105 : };
106 :
107 0 : if let Err(e) = &spec {
108 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
109 : } else {
110 0 : return spec;
111 : }
112 :
113 0 : attempt += 1;
114 0 : std::thread::sleep(std::time::Duration::from_millis(100));
115 : }
116 :
117 : // All attempts failed, return error.
118 0 : spec
119 0 : }
120 :
121 : /// It takes cluster specification and does the following:
122 : /// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
123 : /// - Update `pg_hba.conf` to allow external connections.
124 : pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
125 : // File `postgresql.conf` is no longer included into `basebackup`, so just
126 : // always write all config into it creating new file.
127 0 : config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
128 :
129 0 : update_pg_hba(pgdata_path)?;
130 :
131 0 : Ok(())
132 0 : }
133 :
134 : /// Check `pg_hba.conf` and update if needed to allow external connections.
135 654 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
136 654 : // XXX: consider making it a part of spec.json
137 654 : info!("checking pg_hba.conf");
138 654 : let pghba_path = pgdata_path.join("pg_hba.conf");
139 654 :
140 654 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
141 654 : info!("updated pg_hba.conf to allow external connections");
142 : } else {
143 0 : info!("pg_hba.conf is up-to-date");
144 : }
145 :
146 654 : Ok(())
147 654 : }
148 :
149 : /// Create a standby.signal file
150 84 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
151 84 : // XXX: consider making it a part of spec.json
152 84 : info!("adding standby.signal");
153 84 : let signalfile = pgdata_path.join("standby.signal");
154 84 :
155 84 : if !signalfile.exists() {
156 84 : info!("created standby.signal");
157 84 : File::create(signalfile)?;
158 : } else {
159 0 : info!("reused pre-existing standby.signal");
160 : }
161 84 : Ok(())
162 84 : }
163 :
164 : /// Given a cluster spec json and open transaction it handles roles creation,
165 : /// deletion and update.
166 0 : #[instrument(skip_all)]
167 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
168 : let mut xact = client.transaction()?;
169 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
170 :
171 : // Print a list of existing Postgres roles (only in debug mode)
172 0 : if span_enabled!(Level::INFO) {
173 0 : info!("postgres roles:");
174 : for r in &existing_roles {
175 0 : info!(
176 0 : " - {}:{}",
177 0 : r.name,
178 0 : if r.encrypted_password.is_some() {
179 0 : "[FILTERED]"
180 0 : } else {
181 0 : "(null)"
182 0 : }
183 0 : );
184 : }
185 : }
186 :
187 : // Process delta operations first
188 : if let Some(ops) = &spec.delta_operations {
189 0 : info!("processing role renames");
190 : for op in ops {
191 : match op.action.as_ref() {
192 : "delete_role" => {
193 : // no-op now, roles will be deleted at the end of configuration
194 : }
195 : // Renaming role drops its password, since role name is
196 : // used as a salt there. It is important that this role
197 : // is recorded with a new `name` in the `roles` list.
198 : // Follow up roles update will set the new password.
199 : "rename_role" => {
200 : let new_name = op.new_name.as_ref().unwrap();
201 :
202 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
203 0 : if existing_roles.iter().any(|r| r.name == op.name) {
204 : let query: String = format!(
205 : "ALTER ROLE {} RENAME TO {}",
206 : op.name.pg_quote(),
207 : new_name.pg_quote()
208 : );
209 :
210 0 : warn!("renaming role '{}' to '{}'", op.name, new_name);
211 : xact.execute(query.as_str(), &[])?;
212 : }
213 : }
214 : _ => {}
215 : }
216 : }
217 : }
218 :
219 : // Refresh Postgres roles info to handle possible roles renaming
220 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
221 :
222 0 : info!("cluster spec roles:");
223 : for role in &spec.cluster.roles {
224 : let name = &role.name;
225 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
226 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
227 :
228 : enum RoleAction {
229 : None,
230 : Update,
231 : Create,
232 : }
233 : let action = if let Some(r) = pg_role {
234 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
235 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
236 : {
237 : RoleAction::Update
238 : } else if let Some(pg_pwd) = &r.encrypted_password {
239 : // Check whether password changed or not (trim 'md5' prefix first if any)
240 : //
241 : // This is a backward compatibility hack, which comes from the times when we were using
242 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
243 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
244 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
245 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
246 : // to place this compatibility layer.
247 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
248 : stripped
249 : } else {
250 : pg_pwd
251 : };
252 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
253 : RoleAction::Update
254 : } else {
255 : RoleAction::None
256 : }
257 : } else {
258 : RoleAction::None
259 : }
260 : } else {
261 : RoleAction::Create
262 : };
263 :
264 : match action {
265 : RoleAction::None => {}
266 : RoleAction::Update => {
267 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
268 : query.push_str(&role.to_pg_options());
269 : xact.execute(query.as_str(), &[])?;
270 : }
271 : RoleAction::Create => {
272 : let mut query: String = format!(
273 : "CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
274 : name.pg_quote()
275 : );
276 0 : info!("role create query: '{}'", &query);
277 : query.push_str(&role.to_pg_options());
278 : xact.execute(query.as_str(), &[])?;
279 : }
280 : }
281 :
282 0 : if span_enabled!(Level::INFO) {
283 : let pwd = if role.encrypted_password.is_some() {
284 : "[FILTERED]"
285 : } else {
286 : "(null)"
287 : };
288 : let action_str = match action {
289 : RoleAction::None => "",
290 : RoleAction::Create => " -> create",
291 : RoleAction::Update => " -> update",
292 : };
293 0 : info!(" - {}:{}{}", name, pwd, action_str);
294 : }
295 : }
296 :
297 : xact.commit()?;
298 :
299 : Ok(())
300 : }
301 :
302 : /// Reassign all dependent objects and delete requested roles.
303 0 : #[instrument(skip_all)]
304 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
305 : if let Some(ops) = &spec.delta_operations {
306 : // First, reassign all dependent objects to db owners.
307 0 : info!("reassigning dependent objects of to-be-deleted roles");
308 :
309 : // Fetch existing roles. We could've exported and used `existing_roles` from
310 : // `handle_roles()`, but we only make this list there before creating new roles.
311 : // Which is probably fine as we never create to-be-deleted roles, but that'd
312 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
313 : // buffers already, so this shouldn't be a big deal.
314 : let mut xact = client.transaction()?;
315 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
316 : xact.commit()?;
317 :
318 : for op in ops {
319 : // Check that role is still present in Postgres, as this could be a
320 : // restart with the same spec after role deletion.
321 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
322 : reassign_owned_objects(spec, connstr, &op.name)?;
323 : }
324 : }
325 :
326 : // Second, proceed with role deletions.
327 0 : info!("processing role deletions");
328 : let mut xact = client.transaction()?;
329 : for op in ops {
330 : // We do not check either role exists or not,
331 : // Postgres will take care of it for us
332 : if op.action == "delete_role" {
333 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
334 :
335 0 : warn!("deleting role '{}'", &op.name);
336 : xact.execute(query.as_str(), &[])?;
337 : }
338 : }
339 : xact.commit()?;
340 : }
341 :
342 : Ok(())
343 : }
344 :
345 : // Reassign all owned objects in all databases to the owner of the database.
346 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
347 0 : for db in &spec.cluster.databases {
348 0 : if db.owner != *role_name {
349 0 : let mut conf = Config::from_str(connstr)?;
350 0 : conf.dbname(&db.name);
351 :
352 0 : let mut client = conf.connect(NoTls)?;
353 :
354 : // This will reassign all dependent objects to the db owner
355 0 : let reassign_query = format!(
356 0 : "REASSIGN OWNED BY {} TO {}",
357 0 : role_name.pg_quote(),
358 0 : db.owner.pg_quote()
359 0 : );
360 0 : info!(
361 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
362 0 : role_name, &db.name, &db.owner
363 0 : );
364 0 : client.simple_query(&reassign_query)?;
365 :
366 : // This now will only drop privileges of the role
367 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
368 0 : client.simple_query(&drop_query)?;
369 0 : }
370 : }
371 :
372 0 : Ok(())
373 0 : }
374 :
375 : /// It follows mostly the same logic as `handle_roles()` excepting that we
376 : /// does not use an explicit transactions block, since major database operations
377 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
378 : /// atomicity should be enough here due to the order of operations and various checks,
379 : /// which together provide us idempotency.
380 0 : #[instrument(skip_all)]
381 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
382 : let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
383 :
384 : // Print a list of existing Postgres databases (only in debug mode)
385 0 : if span_enabled!(Level::INFO) {
386 0 : info!("postgres databases:");
387 : for r in &existing_dbs {
388 0 : info!(" {}:{}", r.name, r.owner);
389 : }
390 : }
391 :
392 : // Process delta operations first
393 : if let Some(ops) = &spec.delta_operations {
394 0 : info!("processing delta operations on databases");
395 : for op in ops {
396 : match op.action.as_ref() {
397 : // We do not check either DB exists or not,
398 : // Postgres will take care of it for us
399 : "delete_db" => {
400 : // In Postgres we can't drop a database if it is a template.
401 : // So we need to unset the template flag first, but it could
402 : // be a retry, so we could've already dropped the database.
403 : // Check that database exists first to make it idempotent.
404 : let unset_template_query: String = format!(
405 : "
406 : DO $$
407 : BEGIN
408 : IF EXISTS(
409 : SELECT 1
410 : FROM pg_catalog.pg_database
411 : WHERE datname = {}
412 : )
413 : THEN
414 : ALTER DATABASE {} is_template false;
415 : END IF;
416 : END
417 : $$;",
418 : escape_literal(&op.name),
419 : &op.name.pg_quote()
420 : );
421 : // Use FORCE to drop database even if there are active connections.
422 : // We run this from `cloud_admin`, so it should have enough privileges.
423 : // NB: there could be other db states, which prevent us from dropping
424 : // the database. For example, if db is used by any active subscription
425 : // or replication slot.
426 : // TODO: deal with it once we allow logical replication. Proper fix should
427 : // involve returning an error code to the control plane, so it could
428 : // figure out that this is a non-retryable error, return it to the user
429 : // and fail operation permanently.
430 : let drop_db_query: String = format!(
431 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
432 : &op.name.pg_quote()
433 : );
434 :
435 0 : warn!("deleting database '{}'", &op.name);
436 : client.execute(unset_template_query.as_str(), &[])?;
437 : client.execute(drop_db_query.as_str(), &[])?;
438 : }
439 : "rename_db" => {
440 : let new_name = op.new_name.as_ref().unwrap();
441 :
442 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
443 0 : if existing_dbs.iter().any(|r| r.name == op.name) {
444 : let query: String = format!(
445 : "ALTER DATABASE {} RENAME TO {}",
446 : op.name.pg_quote(),
447 : new_name.pg_quote()
448 : );
449 :
450 0 : warn!("renaming database '{}' to '{}'", op.name, new_name);
451 : client.execute(query.as_str(), &[])?;
452 : }
453 : }
454 : _ => {}
455 : }
456 : }
457 : }
458 :
459 : // Refresh Postgres databases info to handle possible renames
460 : let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
461 :
462 0 : info!("cluster spec databases:");
463 : for db in &spec.cluster.databases {
464 : let name = &db.name;
465 :
466 : // XXX: with a limited number of databases it is fine, but consider making it a HashMap
467 0 : let pg_db = existing_dbs.iter().find(|r| r.name == *name);
468 :
469 : enum DatabaseAction {
470 : None,
471 : Update,
472 : Create,
473 : }
474 : let action = if let Some(r) = pg_db {
475 : // XXX: db owner name is returned as quoted string from Postgres,
476 : // when quoting is needed.
477 : let new_owner = if r.owner.starts_with('"') {
478 : db.owner.pg_quote()
479 : } else {
480 : db.owner.clone()
481 : };
482 :
483 : if new_owner != r.owner {
484 : // Update the owner
485 : DatabaseAction::Update
486 : } else {
487 : DatabaseAction::None
488 : }
489 : } else {
490 : DatabaseAction::Create
491 : };
492 :
493 : match action {
494 : DatabaseAction::None => {}
495 : DatabaseAction::Update => {
496 : let query: String = format!(
497 : "ALTER DATABASE {} OWNER TO {}",
498 : name.pg_quote(),
499 : db.owner.pg_quote()
500 : );
501 : let _guard = info_span!("executing", query).entered();
502 : client.execute(query.as_str(), &[])?;
503 : }
504 : DatabaseAction::Create => {
505 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
506 : query.push_str(&db.to_pg_options());
507 : let _guard = info_span!("executing", query).entered();
508 : client.execute(query.as_str(), &[])?;
509 : let grant_query: String = format!(
510 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
511 : name.pg_quote()
512 : );
513 : client.execute(grant_query.as_str(), &[])?;
514 : }
515 : };
516 :
517 0 : if span_enabled!(Level::INFO) {
518 : let action_str = match action {
519 : DatabaseAction::None => "",
520 : DatabaseAction::Create => " -> create",
521 : DatabaseAction::Update => " -> update",
522 : };
523 0 : info!(" - {}:{}{}", db.name, db.owner, action_str);
524 : }
525 : }
526 :
527 : Ok(())
528 : }
529 :
530 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
531 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
532 0 : #[instrument(skip_all)]
533 : pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
534 0 : info!("cluster spec grants:");
535 :
536 : // Do some per-database access adjustments. We'd better do this at db creation time,
537 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
538 : // atomically.
539 : for db in &spec.cluster.databases {
540 : let mut conf = Config::from_str(connstr)?;
541 : conf.dbname(&db.name);
542 :
543 : let mut db_client = conf.connect(NoTls)?;
544 :
545 : // This will only change ownership on the schema itself, not the objects
546 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
547 : // and database owner cannot do anything with it. SQL procedure ensures
548 : // that it won't error out if schema `public` doesn't exist.
549 : let alter_query = format!(
550 : "DO $$\n\
551 : DECLARE\n\
552 : schema_owner TEXT;\n\
553 : BEGIN\n\
554 : IF EXISTS(\n\
555 : SELECT nspname\n\
556 : FROM pg_catalog.pg_namespace\n\
557 : WHERE nspname = 'public'\n\
558 : )\n\
559 : THEN\n\
560 : SELECT nspowner::regrole::text\n\
561 : FROM pg_catalog.pg_namespace\n\
562 : WHERE nspname = 'public'\n\
563 : INTO schema_owner;\n\
564 : \n\
565 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
566 : THEN\n\
567 : ALTER SCHEMA public OWNER TO {};\n\
568 : END IF;\n\
569 : END IF;\n\
570 : END\n\
571 : $$;",
572 : db.owner.pg_quote()
573 : );
574 : db_client.simple_query(&alter_query)?;
575 :
576 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
577 : // This is needed because since postgres 15 this privilege is removed by default.
578 : let grant_query = "DO $$\n\
579 : BEGIN\n\
580 : IF EXISTS(\n\
581 : SELECT nspname\n\
582 : FROM pg_catalog.pg_namespace\n\
583 : WHERE nspname = 'public'\n\
584 : ) AND\n\
585 : current_setting('server_version_num')::int/10000 >= 15\n\
586 : THEN\n\
587 : IF EXISTS(\n\
588 : SELECT rolname\n\
589 : FROM pg_catalog.pg_roles\n\
590 : WHERE rolname = 'web_access'\n\
591 : )\n\
592 : THEN\n\
593 : GRANT CREATE ON SCHEMA public TO web_access;\n\
594 : END IF;\n\
595 : END IF;\n\
596 : END\n\
597 : $$;"
598 : .to_string();
599 :
600 0 : info!("grant query for db {} : {}", &db.name, &grant_query);
601 : db_client.simple_query(&grant_query)?;
602 : }
603 :
604 : Ok(())
605 : }
606 :
607 : /// Create required system extensions
608 0 : #[instrument(skip_all)]
609 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
610 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
611 : if libs.contains("pg_stat_statements") {
612 : // Create extension only if this compute really needs it
613 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
614 0 : info!("creating system extensions with query: {}", query);
615 : client.simple_query(query)?;
616 : }
617 : }
618 :
619 : Ok(())
620 : }
|