LCOV - differential code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 28.7 % 167 48 119 48
Current Date: 2024-01-09 02:06:09 Functions: 48.2 % 56 27 29 27
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::fs::File;
       2                 : use std::path::Path;
       3                 : use std::str::FromStr;
       4                 : 
       5                 : use anyhow::{anyhow, bail, Result};
       6                 : use postgres::config::Config;
       7                 : use postgres::{Client, NoTls};
       8                 : use reqwest::StatusCode;
       9                 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      10                 : 
      11                 : use crate::config;
      12                 : use crate::params::PG_HBA_ALL_MD5;
      13                 : use crate::pg_helpers::*;
      14                 : 
      15                 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      16                 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      17                 : 
      18                 : // Do control plane request and return response if any. In case of error it
      19                 : // returns a bool flag indicating whether it makes sense to retry the request
      20                 : // and a string with error message.
      21 UBC           0 : fn do_control_plane_request(
      22               0 :     uri: &str,
      23               0 :     jwt: &str,
      24               0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      25               0 :     let resp = reqwest::blocking::Client::new()
      26               0 :         .get(uri)
      27               0 :         .header("Authorization", format!("Bearer {}", jwt))
      28               0 :         .send()
      29               0 :         .map_err(|e| {
      30               0 :             (
      31               0 :                 true,
      32               0 :                 format!("could not perform spec request to control plane: {}", e),
      33               0 :             )
      34               0 :         })?;
      35                 : 
      36               0 :     match resp.status() {
      37               0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      38               0 :             Ok(spec_resp) => Ok(spec_resp),
      39               0 :             Err(e) => Err((
      40               0 :                 true,
      41               0 :                 format!("could not deserialize control plane response: {}", e),
      42               0 :             )),
      43                 :         },
      44                 :         StatusCode::SERVICE_UNAVAILABLE => {
      45               0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      46                 :         }
      47                 :         StatusCode::BAD_GATEWAY => {
      48                 :             // We have a problem with intermittent 502 errors now
      49                 :             // https://github.com/neondatabase/cloud/issues/2353
      50                 :             // It's fine to retry GET request in this case.
      51               0 :             Err((true, "control plane request failed with 502".to_string()))
      52                 :         }
      53                 :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      54                 :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      55               0 :         _ => Err((
      56               0 :             false,
      57               0 :             format!(
      58               0 :                 "unexpected control plane response status code: {}",
      59               0 :                 resp.status()
      60               0 :             ),
      61               0 :         )),
      62                 :     }
      63               0 : }
      64                 : 
      65                 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      66                 : /// env variable is set, it will be used for authorization.
      67               0 : pub fn get_spec_from_control_plane(
      68               0 :     base_uri: &str,
      69               0 :     compute_id: &str,
      70               0 : ) -> Result<Option<ComputeSpec>> {
      71               0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      72               0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      73               0 :         Ok(v) => v,
      74               0 :         Err(_) => "".to_string(),
      75                 :     };
      76               0 :     let mut attempt = 1;
      77               0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      78               0 : 
      79               0 :     info!("getting spec from control plane: {}", cp_uri);
      80                 : 
      81                 :     // Do 3 attempts to get spec from the control plane using the following logic:
      82                 :     // - network error -> then retry
      83                 :     // - compute id is unknown or any other error -> bail out
      84                 :     // - no spec for compute yet (Empty state) -> return Ok(None)
      85                 :     // - got spec -> return Ok(Some(spec))
      86               0 :     while attempt < 4 {
      87               0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      88               0 :             Ok(spec_resp) => match spec_resp.status {
      89               0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      90                 :                 ControlPlaneComputeStatus::Attached => {
      91               0 :                     if let Some(spec) = spec_resp.spec {
      92               0 :                         Ok(Some(spec))
      93                 :                     } else {
      94               0 :                         bail!("compute is attached, but spec is empty")
      95                 :                     }
      96                 :                 }
      97                 :             },
      98               0 :             Err((retry, msg)) => {
      99               0 :                 if retry {
     100               0 :                     Err(anyhow!(msg))
     101                 :                 } else {
     102               0 :                     bail!(msg);
     103                 :                 }
     104                 :             }
     105                 :         };
     106                 : 
     107               0 :         if let Err(e) = &spec {
     108               0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     109                 :         } else {
     110               0 :             return spec;
     111                 :         }
     112                 : 
     113               0 :         attempt += 1;
     114               0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     115                 :     }
     116                 : 
     117                 :     // All attempts failed, return error.
     118               0 :     spec
     119               0 : }
     120                 : 
     121                 : /// Check `pg_hba.conf` and update if needed to allow external connections.
     122 CBC         537 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     123             537 :     // XXX: consider making it a part of spec.json
     124             537 :     info!("checking pg_hba.conf");
     125             537 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     126             537 : 
     127             537 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     128             537 :         info!("updated pg_hba.conf to allow external connections");
     129                 :     } else {
     130 UBC           0 :         info!("pg_hba.conf is up-to-date");
     131                 :     }
     132                 : 
     133 CBC         537 :     Ok(())
     134             537 : }
     135                 : 
     136                 : /// Create a standby.signal file
     137              46 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     138              46 :     // XXX: consider making it a part of spec.json
     139              46 :     info!("adding standby.signal");
     140              46 :     let signalfile = pgdata_path.join("standby.signal");
     141              46 : 
     142              46 :     if !signalfile.exists() {
     143              46 :         info!("created standby.signal");
     144              46 :         File::create(signalfile)?;
     145                 :     } else {
     146 UBC           0 :         info!("reused pre-existing standby.signal");
     147                 :     }
     148 CBC          46 :     Ok(())
     149              46 : }
     150                 : 
     151                 : /// Compute could be unexpectedly shut down, for example, during the
     152                 : /// database dropping. This leaves the database in the invalid state,
     153                 : /// which prevents new db creation with the same name. This function
     154                 : /// will clean it up before proceeding with catalog updates. All
     155                 : /// possible future cleanup operations may go here too.
     156             221 : #[instrument(skip_all)]
     157                 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     158                 :     let existing_dbs = get_existing_dbs(client)?;
     159                 : 
     160                 :     for (_, db) in existing_dbs {
     161                 :         if db.invalid {
     162                 :             // After recent commit in Postgres, interrupted DROP DATABASE
     163                 :             // leaves the database in the invalid state. According to the
     164                 :             // commit message, the only option for user is to drop it again.
     165                 :             // See:
     166                 :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     167                 :             //
     168                 :             // Postgres Neon extension is done the way, that db is de-registered
     169                 :             // in the control plane metadata only after it is dropped. So there is
     170                 :             // a chance that it still thinks that db should exist. This means
     171                 :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     172                 :             // as user can just repeat drop (in vanilla Postgres they would need
     173                 :             // to do the same, btw).
     174                 :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     175               1 :             info!("dropping invalid database {}", db.name);
     176                 :             client.execute(query.as_str(), &[])?;
     177                 :         }
     178                 :     }
     179                 : 
     180                 :     Ok(())
     181                 : }
     182                 : 
     183                 : /// Given a cluster spec json and open transaction it handles roles creation,
     184                 : /// deletion and update.
     185             221 : #[instrument(skip_all)]
     186                 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     187                 :     let mut xact = client.transaction()?;
     188                 :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     189                 : 
     190                 :     // Print a list of existing Postgres roles (only in debug mode)
     191             221 :     if span_enabled!(Level::INFO) {
     192             221 :         info!("postgres roles:");
     193                 :         for r in &existing_roles {
     194            2656 :             info!(
     195            2656 :                 "    - {}:{}",
     196            2656 :                 r.name,
     197            2656 :                 if r.encrypted_password.is_some() {
     198            2656 :                     "[FILTERED]"
     199            2656 :                 } else {
     200            2656 :                     "(null)"
     201            2656 :                 }
     202            2656 :             );
     203                 :         }
     204                 :     }
     205                 : 
     206                 :     // Process delta operations first
     207                 :     if let Some(ops) = &spec.delta_operations {
     208 UBC           0 :         info!("processing role renames");
     209                 :         for op in ops {
     210                 :             match op.action.as_ref() {
     211                 :                 "delete_role" => {
     212                 :                     // no-op now, roles will be deleted at the end of configuration
     213                 :                 }
     214                 :                 // Renaming role drops its password, since role name is
     215                 :                 // used as a salt there.  It is important that this role
     216                 :                 // is recorded with a new `name` in the `roles` list.
     217                 :                 // Follow up roles update will set the new password.
     218                 :                 "rename_role" => {
     219                 :                     let new_name = op.new_name.as_ref().unwrap();
     220                 : 
     221                 :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     222               0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     223                 :                         let query: String = format!(
     224                 :                             "ALTER ROLE {} RENAME TO {}",
     225                 :                             op.name.pg_quote(),
     226                 :                             new_name.pg_quote()
     227                 :                         );
     228                 : 
     229               0 :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     230                 :                         xact.execute(query.as_str(), &[])?;
     231                 :                     }
     232                 :                 }
     233                 :                 _ => {}
     234                 :             }
     235                 :         }
     236                 :     }
     237                 : 
     238                 :     // Refresh Postgres roles info to handle possible roles renaming
     239                 :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     240                 : 
     241 CBC         221 :     info!("cluster spec roles:");
     242                 :     for role in &spec.cluster.roles {
     243                 :         let name = &role.name;
     244                 :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     245 UBC           0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     246                 : 
     247                 :         enum RoleAction {
     248                 :             None,
     249                 :             Update,
     250                 :             Create,
     251                 :         }
     252                 :         let action = if let Some(r) = pg_role {
     253                 :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     254                 :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     255                 :             {
     256                 :                 RoleAction::Update
     257                 :             } else if let Some(pg_pwd) = &r.encrypted_password {
     258                 :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     259                 :                 //
     260                 :                 // This is a backward compatibility hack, which comes from the times when we were using
     261                 :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     262                 :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     263                 :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     264                 :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     265                 :                 // to place this compatibility layer.
     266                 :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     267                 :                     stripped
     268                 :                 } else {
     269                 :                     pg_pwd
     270                 :                 };
     271                 :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     272                 :                     RoleAction::Update
     273                 :                 } else {
     274                 :                     RoleAction::None
     275                 :                 }
     276                 :             } else {
     277                 :                 RoleAction::None
     278                 :             }
     279                 :         } else {
     280                 :             RoleAction::Create
     281                 :         };
     282                 : 
     283                 :         match action {
     284                 :             RoleAction::None => {}
     285                 :             RoleAction::Update => {
     286                 :                 // This can be run on /every/ role! Not just ones created through the console.
     287                 :                 // This means that if you add some funny ALTER here that adds a permission,
     288                 :                 // this will get run even on user-created roles! This will result in different
     289                 :                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     290                 :                 // now only grants LOGIN and changes the password. Please do not allow this branch
     291                 :                 // to do anything silly.
     292                 :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     293                 :                 query.push_str(&role.to_pg_options());
     294                 :                 xact.execute(query.as_str(), &[])?;
     295                 :             }
     296                 :             RoleAction::Create => {
     297                 :                 // This branch only runs when roles are created through the console, so it is
     298                 :                 // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
     299                 :                 // from neon_superuser.
     300                 :                 let mut query: String = format!(
     301                 :                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
     302                 :                     name.pg_quote()
     303                 :                 );
     304               0 :                 info!("role create query: '{}'", &query);
     305                 :                 query.push_str(&role.to_pg_options());
     306                 :                 xact.execute(query.as_str(), &[])?;
     307                 :             }
     308                 :         }
     309                 : 
     310               0 :         if span_enabled!(Level::INFO) {
     311                 :             let pwd = if role.encrypted_password.is_some() {
     312                 :                 "[FILTERED]"
     313                 :             } else {
     314                 :                 "(null)"
     315                 :             };
     316                 :             let action_str = match action {
     317                 :                 RoleAction::None => "",
     318                 :                 RoleAction::Create => " -> create",
     319                 :                 RoleAction::Update => " -> update",
     320                 :             };
     321               0 :             info!("   - {}:{}{}", name, pwd, action_str);
     322                 :         }
     323                 :     }
     324                 : 
     325                 :     xact.commit()?;
     326                 : 
     327                 :     Ok(())
     328                 : }
     329                 : 
     330                 : /// Reassign all dependent objects and delete requested roles.
     331 CBC         221 : #[instrument(skip_all)]
     332                 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     333                 :     if let Some(ops) = &spec.delta_operations {
     334                 :         // First, reassign all dependent objects to db owners.
     335 UBC           0 :         info!("reassigning dependent objects of to-be-deleted roles");
     336                 : 
     337                 :         // Fetch existing roles. We could've exported and used `existing_roles` from
     338                 :         // `handle_roles()`, but we only make this list there before creating new roles.
     339                 :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     340                 :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     341                 :         // buffers already, so this shouldn't be a big deal.
     342                 :         let mut xact = client.transaction()?;
     343                 :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     344                 :         xact.commit()?;
     345                 : 
     346                 :         for op in ops {
     347                 :             // Check that role is still present in Postgres, as this could be a
     348                 :             // restart with the same spec after role deletion.
     349               0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     350                 :                 reassign_owned_objects(spec, connstr, &op.name)?;
     351                 :             }
     352                 :         }
     353                 : 
     354                 :         // Second, proceed with role deletions.
     355               0 :         info!("processing role deletions");
     356                 :         let mut xact = client.transaction()?;
     357                 :         for op in ops {
     358                 :             // We do not check either role exists or not,
     359                 :             // Postgres will take care of it for us
     360                 :             if op.action == "delete_role" {
     361                 :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     362                 : 
     363               0 :                 warn!("deleting role '{}'", &op.name);
     364                 :                 xact.execute(query.as_str(), &[])?;
     365                 :             }
     366                 :         }
     367                 :         xact.commit()?;
     368                 :     }
     369                 : 
     370                 :     Ok(())
     371                 : }
     372                 : 
     373               0 : fn reassign_owned_objects_in_one_db(
     374               0 :     conf: Config,
     375               0 :     role_name: &PgIdent,
     376               0 :     db_owner: &PgIdent,
     377               0 : ) -> Result<()> {
     378               0 :     let mut client = conf.connect(NoTls)?;
     379                 : 
     380                 :     // This will reassign all dependent objects to the db owner
     381               0 :     let reassign_query = format!(
     382               0 :         "REASSIGN OWNED BY {} TO {}",
     383               0 :         role_name.pg_quote(),
     384               0 :         db_owner.pg_quote()
     385               0 :     );
     386               0 :     info!(
     387               0 :         "reassigning objects owned by '{}' in db '{}' to '{}'",
     388               0 :         role_name,
     389               0 :         conf.get_dbname().unwrap_or(""),
     390               0 :         db_owner
     391               0 :     );
     392               0 :     client.simple_query(&reassign_query)?;
     393                 : 
     394                 :     // This now will only drop privileges of the role
     395               0 :     let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     396               0 :     client.simple_query(&drop_query)?;
     397               0 :     Ok(())
     398               0 : }
     399                 : 
     400                 : // Reassign all owned objects in all databases to the owner of the database.
     401               0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     402               0 :     for db in &spec.cluster.databases {
     403               0 :         if db.owner != *role_name {
     404               0 :             let mut conf = Config::from_str(connstr)?;
     405               0 :             conf.dbname(&db.name);
     406               0 :             reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
     407               0 :         }
     408                 :     }
     409                 : 
     410                 :     // Also handle case when there are no databases in the spec.
     411                 :     // In this case we need to reassign objects in the default database.
     412               0 :     let conf = Config::from_str(connstr)?;
     413               0 :     let db_owner = PgIdent::from_str("cloud_admin")?;
     414               0 :     reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
     415                 : 
     416               0 :     Ok(())
     417               0 : }
     418                 : 
     419                 : /// It follows mostly the same logic as `handle_roles()` excepting that we
     420                 : /// does not use an explicit transactions block, since major database operations
     421                 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     422                 : /// atomicity should be enough here due to the order of operations and various checks,
     423                 : /// which together provide us idempotency.
     424 CBC         221 : #[instrument(skip_all)]
     425                 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     426                 :     let existing_dbs = get_existing_dbs(client)?;
     427                 : 
     428                 :     // Print a list of existing Postgres databases (only in debug mode)
     429             221 :     if span_enabled!(Level::INFO) {
     430             221 :         info!("postgres databases:");
     431                 :         for (dbname, db) in &existing_dbs {
     432             663 :             info!("    {}:{}", dbname, db.owner);
     433                 :         }
     434                 :     }
     435                 : 
     436                 :     // Process delta operations first
     437                 :     if let Some(ops) = &spec.delta_operations {
     438 UBC           0 :         info!("processing delta operations on databases");
     439                 :         for op in ops {
     440                 :             match op.action.as_ref() {
     441                 :                 // We do not check either DB exists or not,
     442                 :                 // Postgres will take care of it for us
     443                 :                 "delete_db" => {
     444                 :                     // In Postgres we can't drop a database if it is a template.
     445                 :                     // So we need to unset the template flag first, but it could
     446                 :                     // be a retry, so we could've already dropped the database.
     447                 :                     // Check that database exists first to make it idempotent.
     448                 :                     let unset_template_query: String = format!(
     449                 :                         "
     450                 :                         DO $$
     451                 :                         BEGIN
     452                 :                             IF EXISTS(
     453                 :                                 SELECT 1
     454                 :                                 FROM pg_catalog.pg_database
     455                 :                                 WHERE datname = {}
     456                 :                             )
     457                 :                             THEN
     458                 :                             ALTER DATABASE {} is_template false;
     459                 :                             END IF;
     460                 :                         END
     461                 :                         $$;",
     462                 :                         escape_literal(&op.name),
     463                 :                         &op.name.pg_quote()
     464                 :                     );
     465                 :                     // Use FORCE to drop database even if there are active connections.
     466                 :                     // We run this from `cloud_admin`, so it should have enough privileges.
     467                 :                     // NB: there could be other db states, which prevent us from dropping
     468                 :                     // the database. For example, if db is used by any active subscription
     469                 :                     // or replication slot.
     470                 :                     // TODO: deal with it once we allow logical replication. Proper fix should
     471                 :                     // involve returning an error code to the control plane, so it could
     472                 :                     // figure out that this is a non-retryable error, return it to the user
     473                 :                     // and fail operation permanently.
     474                 :                     let drop_db_query: String = format!(
     475                 :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     476                 :                         &op.name.pg_quote()
     477                 :                     );
     478                 : 
     479               0 :                     warn!("deleting database '{}'", &op.name);
     480                 :                     client.execute(unset_template_query.as_str(), &[])?;
     481                 :                     client.execute(drop_db_query.as_str(), &[])?;
     482                 :                 }
     483                 :                 "rename_db" => {
     484                 :                     let new_name = op.new_name.as_ref().unwrap();
     485                 : 
     486                 :                     if existing_dbs.get(&op.name).is_some() {
     487                 :                         let query: String = format!(
     488                 :                             "ALTER DATABASE {} RENAME TO {}",
     489                 :                             op.name.pg_quote(),
     490                 :                             new_name.pg_quote()
     491                 :                         );
     492                 : 
     493               0 :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     494                 :                         client.execute(query.as_str(), &[])?;
     495                 :                     }
     496                 :                 }
     497                 :                 _ => {}
     498                 :             }
     499                 :         }
     500                 :     }
     501                 : 
     502                 :     // Refresh Postgres databases info to handle possible renames
     503                 :     let existing_dbs = get_existing_dbs(client)?;
     504                 : 
     505 CBC         221 :     info!("cluster spec databases:");
     506                 :     for db in &spec.cluster.databases {
     507                 :         let name = &db.name;
     508                 :         let pg_db = existing_dbs.get(name);
     509                 : 
     510                 :         enum DatabaseAction {
     511                 :             None,
     512                 :             Update,
     513                 :             Create,
     514                 :         }
     515                 :         let action = if let Some(r) = pg_db {
     516                 :             // XXX: db owner name is returned as quoted string from Postgres,
     517                 :             // when quoting is needed.
     518                 :             let new_owner = if r.owner.starts_with('"') {
     519                 :                 db.owner.pg_quote()
     520                 :             } else {
     521                 :                 db.owner.clone()
     522                 :             };
     523                 : 
     524                 :             if new_owner != r.owner {
     525                 :                 // Update the owner
     526                 :                 DatabaseAction::Update
     527                 :             } else {
     528                 :                 DatabaseAction::None
     529                 :             }
     530                 :         } else {
     531                 :             DatabaseAction::Create
     532                 :         };
     533                 : 
     534                 :         match action {
     535                 :             DatabaseAction::None => {}
     536                 :             DatabaseAction::Update => {
     537                 :                 let query: String = format!(
     538                 :                     "ALTER DATABASE {} OWNER TO {}",
     539                 :                     name.pg_quote(),
     540                 :                     db.owner.pg_quote()
     541                 :                 );
     542                 :                 let _guard = info_span!("executing", query).entered();
     543                 :                 client.execute(query.as_str(), &[])?;
     544                 :             }
     545                 :             DatabaseAction::Create => {
     546                 :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     547                 :                 query.push_str(&db.to_pg_options());
     548                 :                 let _guard = info_span!("executing", query).entered();
     549                 :                 client.execute(query.as_str(), &[])?;
     550                 :                 let grant_query: String = format!(
     551                 :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     552                 :                     name.pg_quote()
     553                 :                 );
     554                 :                 client.execute(grant_query.as_str(), &[])?;
     555                 :             }
     556                 :         };
     557                 : 
     558 UBC           0 :         if span_enabled!(Level::INFO) {
     559                 :             let action_str = match action {
     560                 :                 DatabaseAction::None => "",
     561                 :                 DatabaseAction::Create => " -> create",
     562                 :                 DatabaseAction::Update => " -> update",
     563                 :             };
     564               0 :             info!("   - {}:{}{}", db.name, db.owner, action_str);
     565                 :         }
     566                 :     }
     567                 : 
     568                 :     Ok(())
     569                 : }
     570                 : 
     571                 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     572                 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     573 CBC         221 : #[instrument(skip_all)]
     574                 : pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
     575             221 :     info!("modifying database permissions");
     576                 :     let existing_dbs = get_existing_dbs(client)?;
     577                 : 
     578                 :     // Do some per-database access adjustments. We'd better do this at db creation time,
     579                 :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     580                 :     // atomically.
     581                 :     for db in &spec.cluster.databases {
     582                 :         match existing_dbs.get(&db.name) {
     583                 :             Some(pg_db) => {
     584                 :                 if pg_db.restrict_conn || pg_db.invalid {
     585 UBC           0 :                     info!(
     586               0 :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     587               0 :                         db.name, pg_db.invalid, pg_db.restrict_conn
     588               0 :                     );
     589                 :                     continue;
     590                 :                 }
     591                 :             }
     592                 :             None => {
     593                 :                 bail!(
     594                 :                     "database {} doesn't exist in Postgres after handle_databases()",
     595                 :                     db.name
     596                 :                 );
     597                 :             }
     598                 :         }
     599                 : 
     600                 :         let mut conf = Config::from_str(connstr)?;
     601                 :         conf.dbname(&db.name);
     602                 : 
     603                 :         let mut db_client = conf.connect(NoTls)?;
     604                 : 
     605                 :         // This will only change ownership on the schema itself, not the objects
     606                 :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     607                 :         // and database owner cannot do anything with it. SQL procedure ensures
     608                 :         // that it won't error out if schema `public` doesn't exist.
     609                 :         let alter_query = format!(
     610                 :             "DO $$\n\
     611                 :                 DECLARE\n\
     612                 :                     schema_owner TEXT;\n\
     613                 :                 BEGIN\n\
     614                 :                     IF EXISTS(\n\
     615                 :                         SELECT nspname\n\
     616                 :                         FROM pg_catalog.pg_namespace\n\
     617                 :                         WHERE nspname = 'public'\n\
     618                 :                     )\n\
     619                 :                     THEN\n\
     620                 :                         SELECT nspowner::regrole::text\n\
     621                 :                             FROM pg_catalog.pg_namespace\n\
     622                 :                             WHERE nspname = 'public'\n\
     623                 :                             INTO schema_owner;\n\
     624                 :                 \n\
     625                 :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     626                 :                         THEN\n\
     627                 :                             ALTER SCHEMA public OWNER TO {};\n\
     628                 :                         END IF;\n\
     629                 :                     END IF;\n\
     630                 :                 END\n\
     631                 :             $$;",
     632                 :             db.owner.pg_quote()
     633                 :         );
     634                 :         db_client.simple_query(&alter_query)?;
     635                 : 
     636                 :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     637                 :         // This is needed because since postgres 15 this privilege is removed by default.
     638                 :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     639                 :         // active users of 1 year old projects, but hopefully not, so check it and
     640                 :         // remove this code if possible. The worst thing that could happen is that
     641                 :         // user won't be able to use public schema in NEW databases created in the
     642                 :         // very OLD project.
     643                 :         let grant_query = "DO $$\n\
     644                 :                 BEGIN\n\
     645                 :                     IF EXISTS(\n\
     646                 :                         SELECT nspname\n\
     647                 :                         FROM pg_catalog.pg_namespace\n\
     648                 :                         WHERE nspname = 'public'\n\
     649                 :                     ) AND\n\
     650                 :                     current_setting('server_version_num')::int/10000 >= 15\n\
     651                 :                     THEN\n\
     652                 :                         IF EXISTS(\n\
     653                 :                             SELECT rolname\n\
     654                 :                             FROM pg_catalog.pg_roles\n\
     655                 :                             WHERE rolname = 'web_access'\n\
     656                 :                         )\n\
     657                 :                         THEN\n\
     658                 :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     659                 :                         END IF;\n\
     660                 :                     END IF;\n\
     661                 :                 END\n\
     662                 :             $$;"
     663                 :         .to_string();
     664                 : 
     665               0 :         info!("grant query for db {} : {}", &db.name, &grant_query);
     666                 :         db_client.simple_query(&grant_query)?;
     667                 :     }
     668                 : 
     669                 :     Ok(())
     670                 : }
     671                 : 
     672                 : /// Create required system extensions
     673 CBC         221 : #[instrument(skip_all)]
     674                 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     675                 :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     676                 :         if libs.contains("pg_stat_statements") {
     677                 :             // Create extension only if this compute really needs it
     678                 :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     679 UBC           0 :             info!("creating system extensions with query: {}", query);
     680                 :             client.simple_query(query)?;
     681                 :         }
     682                 :     }
     683                 : 
     684                 :     Ok(())
     685                 : }
     686                 : 
     687                 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
     688 CBC         221 : #[instrument(skip_all)]
     689                 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
     690             221 :     info!("handle extension neon");
     691                 : 
     692                 :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
     693                 :     client.simple_query(query)?;
     694                 : 
     695                 :     query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
     696             221 :     info!("create neon extension with query: {}", query);
     697                 :     client.simple_query(query)?;
     698                 : 
     699                 :     query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
     700                 :     client.simple_query(query)?;
     701                 : 
     702                 :     query = "ALTER EXTENSION neon SET SCHEMA neon";
     703             221 :     info!("alter neon extension schema with query: {}", query);
     704                 :     client.simple_query(query)?;
     705                 : 
     706                 :     // this will be a no-op if extension is already up to date,
     707                 :     // which may happen in two cases:
     708                 :     // - extension was just installed
     709                 :     // - extension was already installed and is up to date
     710                 :     let query = "ALTER EXTENSION neon UPDATE";
     711             221 :     info!("update neon extension schema with query: {}", query);
     712                 :     client.simple_query(query)?;
     713                 : 
     714                 :     Ok(())
     715                 : }
        

Generated by: LCOV version 2.1-beta