LCOV - differential code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 28.1 % 153 43 110 43
Current Date: 2023-10-19 02:04:12 Functions: 43.1 % 51 22 29 22
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::fs::File;
       2                 : use std::path::Path;
       3                 : use std::str::FromStr;
       4                 : 
       5                 : use anyhow::{anyhow, bail, Result};
       6                 : use postgres::config::Config;
       7                 : use postgres::{Client, NoTls};
       8                 : use reqwest::StatusCode;
       9                 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      10                 : 
      11                 : use crate::config;
      12                 : use crate::params::PG_HBA_ALL_MD5;
      13                 : use crate::pg_helpers::*;
      14                 : 
      15                 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      16                 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      17                 : 
      18                 : // Do control plane request and return response if any. In case of error it
      19                 : // returns a bool flag indicating whether it makes sense to retry the request
      20                 : // and a string with error message.
      21 UBC           0 : fn do_control_plane_request(
      22               0 :     uri: &str,
      23               0 :     jwt: &str,
      24               0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      25               0 :     let resp = reqwest::blocking::Client::new()
      26               0 :         .get(uri)
      27               0 :         .header("Authorization", jwt)
      28               0 :         .send()
      29               0 :         .map_err(|e| {
      30               0 :             (
      31               0 :                 true,
      32               0 :                 format!("could not perform spec request to control plane: {}", e),
      33               0 :             )
      34               0 :         })?;
      35                 : 
      36               0 :     match resp.status() {
      37               0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      38               0 :             Ok(spec_resp) => Ok(spec_resp),
      39               0 :             Err(e) => Err((
      40               0 :                 true,
      41               0 :                 format!("could not deserialize control plane response: {}", e),
      42               0 :             )),
      43                 :         },
      44                 :         StatusCode::SERVICE_UNAVAILABLE => {
      45               0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      46                 :         }
      47                 :         StatusCode::BAD_GATEWAY => {
      48                 :             // We have a problem with intermittent 502 errors now
      49                 :             // https://github.com/neondatabase/cloud/issues/2353
      50                 :             // It's fine to retry GET request in this case.
      51               0 :             Err((true, "control plane request failed with 502".to_string()))
      52                 :         }
      53                 :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      54                 :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      55               0 :         _ => Err((
      56               0 :             false,
      57               0 :             format!(
      58               0 :                 "unexpected control plane response status code: {}",
      59               0 :                 resp.status()
      60               0 :             ),
      61               0 :         )),
      62                 :     }
      63               0 : }
      64                 : 
      65                 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      66                 : /// env variable is set, it will be used for authorization.
      67               0 : pub fn get_spec_from_control_plane(
      68               0 :     base_uri: &str,
      69               0 :     compute_id: &str,
      70               0 : ) -> Result<Option<ComputeSpec>> {
      71               0 :     let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
      72               0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      73               0 :         Ok(v) => v,
      74               0 :         Err(_) => "".to_string(),
      75                 :     };
      76               0 :     let mut attempt = 1;
      77               0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      78               0 : 
      79               0 :     info!("getting spec from control plane: {}", cp_uri);
      80                 : 
      81                 :     // Do 3 attempts to get spec from the control plane using the following logic:
      82                 :     // - network error -> then retry
      83                 :     // - compute id is unknown or any other error -> bail out
      84                 :     // - no spec for compute yet (Empty state) -> return Ok(None)
      85                 :     // - got spec -> return Ok(Some(spec))
      86               0 :     while attempt < 4 {
      87               0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      88               0 :             Ok(spec_resp) => match spec_resp.status {
      89               0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      90                 :                 ControlPlaneComputeStatus::Attached => {
      91               0 :                     if let Some(spec) = spec_resp.spec {
      92               0 :                         Ok(Some(spec))
      93                 :                     } else {
      94               0 :                         bail!("compute is attached, but spec is empty")
      95                 :                     }
      96                 :                 }
      97                 :             },
      98               0 :             Err((retry, msg)) => {
      99               0 :                 if retry {
     100               0 :                     Err(anyhow!(msg))
     101                 :                 } else {
     102               0 :                     bail!(msg);
     103                 :                 }
     104                 :             }
     105                 :         };
     106                 : 
     107               0 :         if let Err(e) = &spec {
     108               0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     109                 :         } else {
     110               0 :             return spec;
     111                 :         }
     112                 : 
     113               0 :         attempt += 1;
     114               0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     115                 :     }
     116                 : 
     117                 :     // All attempts failed, return error.
     118               0 :     spec
     119               0 : }
     120                 : 
     121                 : /// It takes cluster specification and does the following:
     122                 : /// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
     123                 : /// - Update `pg_hba.conf` to allow external connections.
     124                 : pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
     125                 :     // File `postgresql.conf` is no longer included into `basebackup`, so just
     126                 :     // always write all config into it creating new file.
     127               0 :     config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
     128                 : 
     129               0 :     update_pg_hba(pgdata_path)?;
     130                 : 
     131               0 :     Ok(())
     132               0 : }
     133                 : 
     134                 : /// Check `pg_hba.conf` and update if needed to allow external connections.
     135 CBC         632 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     136             632 :     // XXX: consider making it a part of spec.json
     137             632 :     info!("checking pg_hba.conf");
     138             632 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     139             632 : 
     140             632 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     141             632 :         info!("updated pg_hba.conf to allow external connections");
     142                 :     } else {
     143 UBC           0 :         info!("pg_hba.conf is up-to-date");
     144                 :     }
     145                 : 
     146 CBC         632 :     Ok(())
     147             632 : }
     148                 : 
     149                 : /// Create a standby.signal file
     150              84 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     151              84 :     // XXX: consider making it a part of spec.json
     152              84 :     info!("adding standby.signal");
     153              84 :     let signalfile = pgdata_path.join("standby.signal");
     154              84 : 
     155              84 :     if !signalfile.exists() {
     156              84 :         info!("created standby.signal");
     157              84 :         File::create(signalfile)?;
     158                 :     } else {
     159 UBC           0 :         info!("reused pre-existing standby.signal");
     160                 :     }
     161 CBC          84 :     Ok(())
     162              84 : }
     163                 : 
     164                 : /// Compute could be unexpectedly shut down, for example, during the
     165                 : /// database dropping. This leaves the database in the invalid state,
     166                 : /// which prevents new db creation with the same name. This function
     167                 : /// will clean it up before proceeding with catalog updates. All
     168                 : /// possible future cleanup operations may go here too.
     169               1 : #[instrument(skip_all)]
     170                 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     171                 :     let existing_dbs = get_existing_dbs(client)?;
     172                 : 
     173                 :     for (_, db) in existing_dbs {
     174                 :         if db.invalid {
     175                 :             // After recent commit in Postgres, interrupted DROP DATABASE
     176                 :             // leaves the database in the invalid state. According to the
     177                 :             // commit message, the only option for user is to drop it again.
     178                 :             // See:
     179                 :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     180                 :             //
     181                 :             // Postgres Neon extension is done the way, that db is de-registered
     182                 :             // in the control plane metadata only after it is dropped. So there is
     183                 :             // a chance that it still thinks that db should exist. This means
     184                 :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     185                 :             // as user can just repeat drop (in vanilla Postgres they would need
     186                 :             // to do the same, btw).
     187                 :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     188               1 :             info!("dropping invalid database {}", db.name);
     189                 :             client.execute(query.as_str(), &[])?;
     190                 :         }
     191                 :     }
     192                 : 
     193                 :     Ok(())
     194                 : }
     195                 : 
     196                 : /// Given a cluster spec json and open transaction it handles roles creation,
     197                 : /// deletion and update.
     198               1 : #[instrument(skip_all)]
     199                 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     200                 :     let mut xact = client.transaction()?;
     201                 :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     202                 : 
     203                 :     // Print a list of existing Postgres roles (only in debug mode)
     204               1 :     if span_enabled!(Level::INFO) {
     205               1 :         info!("postgres roles:");
     206                 :         for r in &existing_roles {
     207              13 :             info!(
     208              13 :                 "    - {}:{}",
     209              13 :                 r.name,
     210              13 :                 if r.encrypted_password.is_some() {
     211              13 :                     "[FILTERED]"
     212              13 :                 } else {
     213              13 :                     "(null)"
     214              13 :                 }
     215              13 :             );
     216                 :         }
     217                 :     }
     218                 : 
     219                 :     // Process delta operations first
     220                 :     if let Some(ops) = &spec.delta_operations {
     221 UBC           0 :         info!("processing role renames");
     222                 :         for op in ops {
     223                 :             match op.action.as_ref() {
     224                 :                 "delete_role" => {
     225                 :                     // no-op now, roles will be deleted at the end of configuration
     226                 :                 }
     227                 :                 // Renaming role drops its password, since role name is
     228                 :                 // used as a salt there.  It is important that this role
     229                 :                 // is recorded with a new `name` in the `roles` list.
     230                 :                 // Follow up roles update will set the new password.
     231                 :                 "rename_role" => {
     232                 :                     let new_name = op.new_name.as_ref().unwrap();
     233                 : 
     234                 :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     235               0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     236                 :                         let query: String = format!(
     237                 :                             "ALTER ROLE {} RENAME TO {}",
     238                 :                             op.name.pg_quote(),
     239                 :                             new_name.pg_quote()
     240                 :                         );
     241                 : 
     242               0 :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     243                 :                         xact.execute(query.as_str(), &[])?;
     244                 :                     }
     245                 :                 }
     246                 :                 _ => {}
     247                 :             }
     248                 :         }
     249                 :     }
     250                 : 
     251                 :     // Refresh Postgres roles info to handle possible roles renaming
     252                 :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     253                 : 
     254 CBC           1 :     info!("cluster spec roles:");
     255                 :     for role in &spec.cluster.roles {
     256                 :         let name = &role.name;
     257                 :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     258 UBC           0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     259                 : 
     260                 :         enum RoleAction {
     261                 :             None,
     262                 :             Update,
     263                 :             Create,
     264                 :         }
     265                 :         let action = if let Some(r) = pg_role {
     266                 :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     267                 :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     268                 :             {
     269                 :                 RoleAction::Update
     270                 :             } else if let Some(pg_pwd) = &r.encrypted_password {
     271                 :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     272                 :                 //
     273                 :                 // This is a backward compatibility hack, which comes from the times when we were using
     274                 :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     275                 :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     276                 :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     277                 :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     278                 :                 // to place this compatibility layer.
     279                 :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     280                 :                     stripped
     281                 :                 } else {
     282                 :                     pg_pwd
     283                 :                 };
     284                 :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     285                 :                     RoleAction::Update
     286                 :                 } else {
     287                 :                     RoleAction::None
     288                 :                 }
     289                 :             } else {
     290                 :                 RoleAction::None
     291                 :             }
     292                 :         } else {
     293                 :             RoleAction::Create
     294                 :         };
     295                 : 
     296                 :         match action {
     297                 :             RoleAction::None => {}
     298                 :             RoleAction::Update => {
     299                 :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     300                 :                 query.push_str(&role.to_pg_options());
     301                 :                 xact.execute(query.as_str(), &[])?;
     302                 :             }
     303                 :             RoleAction::Create => {
     304                 :                 let mut query: String = format!(
     305                 :                     "CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
     306                 :                     name.pg_quote()
     307                 :                 );
     308               0 :                 info!("role create query: '{}'", &query);
     309                 :                 query.push_str(&role.to_pg_options());
     310                 :                 xact.execute(query.as_str(), &[])?;
     311                 :             }
     312                 :         }
     313                 : 
     314               0 :         if span_enabled!(Level::INFO) {
     315                 :             let pwd = if role.encrypted_password.is_some() {
     316                 :                 "[FILTERED]"
     317                 :             } else {
     318                 :                 "(null)"
     319                 :             };
     320                 :             let action_str = match action {
     321                 :                 RoleAction::None => "",
     322                 :                 RoleAction::Create => " -> create",
     323                 :                 RoleAction::Update => " -> update",
     324                 :             };
     325               0 :             info!("   - {}:{}{}", name, pwd, action_str);
     326                 :         }
     327                 :     }
     328                 : 
     329                 :     xact.commit()?;
     330                 : 
     331                 :     Ok(())
     332                 : }
     333                 : 
     334                 : /// Reassign all dependent objects and delete requested roles.
     335 CBC           1 : #[instrument(skip_all)]
     336                 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     337                 :     if let Some(ops) = &spec.delta_operations {
     338                 :         // First, reassign all dependent objects to db owners.
     339 UBC           0 :         info!("reassigning dependent objects of to-be-deleted roles");
     340                 : 
     341                 :         // Fetch existing roles. We could've exported and used `existing_roles` from
     342                 :         // `handle_roles()`, but we only make this list there before creating new roles.
     343                 :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     344                 :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     345                 :         // buffers already, so this shouldn't be a big deal.
     346                 :         let mut xact = client.transaction()?;
     347                 :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     348                 :         xact.commit()?;
     349                 : 
     350                 :         for op in ops {
     351                 :             // Check that role is still present in Postgres, as this could be a
     352                 :             // restart with the same spec after role deletion.
     353               0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     354                 :                 reassign_owned_objects(spec, connstr, &op.name)?;
     355                 :             }
     356                 :         }
     357                 : 
     358                 :         // Second, proceed with role deletions.
     359               0 :         info!("processing role deletions");
     360                 :         let mut xact = client.transaction()?;
     361                 :         for op in ops {
     362                 :             // We do not check either role exists or not,
     363                 :             // Postgres will take care of it for us
     364                 :             if op.action == "delete_role" {
     365                 :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     366                 : 
     367               0 :                 warn!("deleting role '{}'", &op.name);
     368                 :                 xact.execute(query.as_str(), &[])?;
     369                 :             }
     370                 :         }
     371                 :         xact.commit()?;
     372                 :     }
     373                 : 
     374                 :     Ok(())
     375                 : }
     376                 : 
     377                 : // Reassign all owned objects in all databases to the owner of the database.
     378               0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     379               0 :     for db in &spec.cluster.databases {
     380               0 :         if db.owner != *role_name {
     381               0 :             let mut conf = Config::from_str(connstr)?;
     382               0 :             conf.dbname(&db.name);
     383                 : 
     384               0 :             let mut client = conf.connect(NoTls)?;
     385                 : 
     386                 :             // This will reassign all dependent objects to the db owner
     387               0 :             let reassign_query = format!(
     388               0 :                 "REASSIGN OWNED BY {} TO {}",
     389               0 :                 role_name.pg_quote(),
     390               0 :                 db.owner.pg_quote()
     391               0 :             );
     392               0 :             info!(
     393               0 :                 "reassigning objects owned by '{}' in db '{}' to '{}'",
     394               0 :                 role_name, &db.name, &db.owner
     395               0 :             );
     396               0 :             client.simple_query(&reassign_query)?;
     397                 : 
     398                 :             // This now will only drop privileges of the role
     399               0 :             let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     400               0 :             client.simple_query(&drop_query)?;
     401               0 :         }
     402                 :     }
     403                 : 
     404               0 :     Ok(())
     405               0 : }
     406                 : 
     407                 : /// It follows mostly the same logic as `handle_roles()` excepting that we
     408                 : /// does not use an explicit transactions block, since major database operations
     409                 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     410                 : /// atomicity should be enough here due to the order of operations and various checks,
     411                 : /// which together provide us idempotency.
     412 CBC           1 : #[instrument(skip_all)]
     413                 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     414                 :     let existing_dbs = get_existing_dbs(client)?;
     415                 : 
     416                 :     // Print a list of existing Postgres databases (only in debug mode)
     417               1 :     if span_enabled!(Level::INFO) {
     418               1 :         info!("postgres databases:");
     419                 :         for (dbname, db) in &existing_dbs {
     420               3 :             info!("    {}:{}", dbname, db.owner);
     421                 :         }
     422                 :     }
     423                 : 
     424                 :     // Process delta operations first
     425                 :     if let Some(ops) = &spec.delta_operations {
     426 UBC           0 :         info!("processing delta operations on databases");
     427                 :         for op in ops {
     428                 :             match op.action.as_ref() {
     429                 :                 // We do not check either DB exists or not,
     430                 :                 // Postgres will take care of it for us
     431                 :                 "delete_db" => {
     432                 :                     // In Postgres we can't drop a database if it is a template.
     433                 :                     // So we need to unset the template flag first, but it could
     434                 :                     // be a retry, so we could've already dropped the database.
     435                 :                     // Check that database exists first to make it idempotent.
     436                 :                     let unset_template_query: String = format!(
     437                 :                         "
     438                 :                         DO $$
     439                 :                         BEGIN
     440                 :                             IF EXISTS(
     441                 :                                 SELECT 1
     442                 :                                 FROM pg_catalog.pg_database
     443                 :                                 WHERE datname = {}
     444                 :                             )
     445                 :                             THEN
     446                 :                             ALTER DATABASE {} is_template false;
     447                 :                             END IF;
     448                 :                         END
     449                 :                         $$;",
     450                 :                         escape_literal(&op.name),
     451                 :                         &op.name.pg_quote()
     452                 :                     );
     453                 :                     // Use FORCE to drop database even if there are active connections.
     454                 :                     // We run this from `cloud_admin`, so it should have enough privileges.
     455                 :                     // NB: there could be other db states, which prevent us from dropping
     456                 :                     // the database. For example, if db is used by any active subscription
     457                 :                     // or replication slot.
     458                 :                     // TODO: deal with it once we allow logical replication. Proper fix should
     459                 :                     // involve returning an error code to the control plane, so it could
     460                 :                     // figure out that this is a non-retryable error, return it to the user
     461                 :                     // and fail operation permanently.
     462                 :                     let drop_db_query: String = format!(
     463                 :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     464                 :                         &op.name.pg_quote()
     465                 :                     );
     466                 : 
     467               0 :                     warn!("deleting database '{}'", &op.name);
     468                 :                     client.execute(unset_template_query.as_str(), &[])?;
     469                 :                     client.execute(drop_db_query.as_str(), &[])?;
     470                 :                 }
     471                 :                 "rename_db" => {
     472                 :                     let new_name = op.new_name.as_ref().unwrap();
     473                 : 
     474                 :                     if existing_dbs.get(&op.name).is_some() {
     475                 :                         let query: String = format!(
     476                 :                             "ALTER DATABASE {} RENAME TO {}",
     477                 :                             op.name.pg_quote(),
     478                 :                             new_name.pg_quote()
     479                 :                         );
     480                 : 
     481               0 :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     482                 :                         client.execute(query.as_str(), &[])?;
     483                 :                     }
     484                 :                 }
     485                 :                 _ => {}
     486                 :             }
     487                 :         }
     488                 :     }
     489                 : 
     490                 :     // Refresh Postgres databases info to handle possible renames
     491                 :     let existing_dbs = get_existing_dbs(client)?;
     492                 : 
     493 CBC           1 :     info!("cluster spec databases:");
     494                 :     for db in &spec.cluster.databases {
     495                 :         let name = &db.name;
     496                 :         let pg_db = existing_dbs.get(name);
     497                 : 
     498                 :         enum DatabaseAction {
     499                 :             None,
     500                 :             Update,
     501                 :             Create,
     502                 :         }
     503                 :         let action = if let Some(r) = pg_db {
     504                 :             // XXX: db owner name is returned as quoted string from Postgres,
     505                 :             // when quoting is needed.
     506                 :             let new_owner = if r.owner.starts_with('"') {
     507                 :                 db.owner.pg_quote()
     508                 :             } else {
     509                 :                 db.owner.clone()
     510                 :             };
     511                 : 
     512                 :             if new_owner != r.owner {
     513                 :                 // Update the owner
     514                 :                 DatabaseAction::Update
     515                 :             } else {
     516                 :                 DatabaseAction::None
     517                 :             }
     518                 :         } else {
     519                 :             DatabaseAction::Create
     520                 :         };
     521                 : 
     522                 :         match action {
     523                 :             DatabaseAction::None => {}
     524                 :             DatabaseAction::Update => {
     525                 :                 let query: String = format!(
     526                 :                     "ALTER DATABASE {} OWNER TO {}",
     527                 :                     name.pg_quote(),
     528                 :                     db.owner.pg_quote()
     529                 :                 );
     530                 :                 let _guard = info_span!("executing", query).entered();
     531                 :                 client.execute(query.as_str(), &[])?;
     532                 :             }
     533                 :             DatabaseAction::Create => {
     534                 :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     535                 :                 query.push_str(&db.to_pg_options());
     536                 :                 let _guard = info_span!("executing", query).entered();
     537                 :                 client.execute(query.as_str(), &[])?;
     538                 :                 let grant_query: String = format!(
     539                 :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     540                 :                     name.pg_quote()
     541                 :                 );
     542                 :                 client.execute(grant_query.as_str(), &[])?;
     543                 :             }
     544                 :         };
     545                 : 
     546 UBC           0 :         if span_enabled!(Level::INFO) {
     547                 :             let action_str = match action {
     548                 :                 DatabaseAction::None => "",
     549                 :                 DatabaseAction::Create => " -> create",
     550                 :                 DatabaseAction::Update => " -> update",
     551                 :             };
     552               0 :             info!("   - {}:{}{}", db.name, db.owner, action_str);
     553                 :         }
     554                 :     }
     555                 : 
     556                 :     Ok(())
     557                 : }
     558                 : 
     559                 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     560                 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     561 CBC           1 : #[instrument(skip_all)]
     562                 : pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
     563               1 :     info!("modifying database permissions");
     564                 :     let existing_dbs = get_existing_dbs(client)?;
     565                 : 
     566                 :     // Do some per-database access adjustments. We'd better do this at db creation time,
     567                 :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     568                 :     // atomically.
     569                 :     for db in &spec.cluster.databases {
     570                 :         match existing_dbs.get(&db.name) {
     571                 :             Some(pg_db) => {
     572                 :                 if pg_db.restrict_conn || pg_db.invalid {
     573 UBC           0 :                     info!(
     574               0 :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     575               0 :                         db.name, pg_db.invalid, pg_db.restrict_conn
     576               0 :                     );
     577                 :                     continue;
     578                 :                 }
     579                 :             }
     580                 :             None => {
     581                 :                 bail!(
     582                 :                     "database {} doesn't exist in Postgres after handle_databases()",
     583                 :                     db.name
     584                 :                 );
     585                 :             }
     586                 :         }
     587                 : 
     588                 :         let mut conf = Config::from_str(connstr)?;
     589                 :         conf.dbname(&db.name);
     590                 : 
     591                 :         let mut db_client = conf.connect(NoTls)?;
     592                 : 
     593                 :         // This will only change ownership on the schema itself, not the objects
     594                 :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     595                 :         // and database owner cannot do anything with it. SQL procedure ensures
     596                 :         // that it won't error out if schema `public` doesn't exist.
     597                 :         let alter_query = format!(
     598                 :             "DO $$\n\
     599                 :                 DECLARE\n\
     600                 :                     schema_owner TEXT;\n\
     601                 :                 BEGIN\n\
     602                 :                     IF EXISTS(\n\
     603                 :                         SELECT nspname\n\
     604                 :                         FROM pg_catalog.pg_namespace\n\
     605                 :                         WHERE nspname = 'public'\n\
     606                 :                     )\n\
     607                 :                     THEN\n\
     608                 :                         SELECT nspowner::regrole::text\n\
     609                 :                             FROM pg_catalog.pg_namespace\n\
     610                 :                             WHERE nspname = 'public'\n\
     611                 :                             INTO schema_owner;\n\
     612                 :                 \n\
     613                 :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     614                 :                         THEN\n\
     615                 :                             ALTER SCHEMA public OWNER TO {};\n\
     616                 :                         END IF;\n\
     617                 :                     END IF;\n\
     618                 :                 END\n\
     619                 :             $$;",
     620                 :             db.owner.pg_quote()
     621                 :         );
     622                 :         db_client.simple_query(&alter_query)?;
     623                 : 
     624                 :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     625                 :         // This is needed because since postgres 15 this privilege is removed by default.
     626                 :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     627                 :         // active users of 1 year old projects, but hopefully not, so check it and
     628                 :         // remove this code if possible. The worst thing that could happen is that
     629                 :         // user won't be able to use public schema in NEW databases created in the
     630                 :         // very OLD project.
     631                 :         let grant_query = "DO $$\n\
     632                 :                 BEGIN\n\
     633                 :                     IF EXISTS(\n\
     634                 :                         SELECT nspname\n\
     635                 :                         FROM pg_catalog.pg_namespace\n\
     636                 :                         WHERE nspname = 'public'\n\
     637                 :                     ) AND\n\
     638                 :                     current_setting('server_version_num')::int/10000 >= 15\n\
     639                 :                     THEN\n\
     640                 :                         IF EXISTS(\n\
     641                 :                             SELECT rolname\n\
     642                 :                             FROM pg_catalog.pg_roles\n\
     643                 :                             WHERE rolname = 'web_access'\n\
     644                 :                         )\n\
     645                 :                         THEN\n\
     646                 :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     647                 :                         END IF;\n\
     648                 :                     END IF;\n\
     649                 :                 END\n\
     650                 :             $$;"
     651                 :         .to_string();
     652                 : 
     653               0 :         info!("grant query for db {} : {}", &db.name, &grant_query);
     654                 :         db_client.simple_query(&grant_query)?;
     655                 :     }
     656                 : 
     657                 :     Ok(())
     658                 : }
     659                 : 
     660                 : /// Create required system extensions
     661 CBC           1 : #[instrument(skip_all)]
     662                 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     663                 :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     664                 :         if libs.contains("pg_stat_statements") {
     665                 :             // Create extension only if this compute really needs it
     666                 :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     667 UBC           0 :             info!("creating system extensions with query: {}", query);
     668                 :             client.simple_query(query)?;
     669                 :         }
     670                 :     }
     671                 : 
     672                 :     Ok(())
     673                 : }
        

Generated by: LCOV version 2.1-beta