LCOV - code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 29.3 % 174 51
Test Date: 2024-02-07 07:37:29 Functions: 50.0 % 58 29

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::path::Path;
       3              : use std::str::FromStr;
       4              : 
       5              : use anyhow::{anyhow, bail, Result};
       6              : use postgres::config::Config;
       7              : use postgres::{Client, NoTls};
       8              : use reqwest::StatusCode;
       9              : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      10              : 
      11              : use crate::config;
      12              : use crate::logger::inlinify;
      13              : use crate::params::PG_HBA_ALL_MD5;
      14              : use crate::pg_helpers::*;
      15              : 
      16              : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      17              : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      18              : 
      19              : // Do control plane request and return response if any. In case of error it
      20              : // returns a bool flag indicating whether it makes sense to retry the request
      21              : // and a string with error message.
      22            0 : fn do_control_plane_request(
      23            0 :     uri: &str,
      24            0 :     jwt: &str,
      25            0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      26            0 :     let resp = reqwest::blocking::Client::new()
      27            0 :         .get(uri)
      28            0 :         .header("Authorization", format!("Bearer {}", jwt))
      29            0 :         .send()
      30            0 :         .map_err(|e| {
      31            0 :             (
      32            0 :                 true,
      33            0 :                 format!("could not perform spec request to control plane: {}", e),
      34            0 :             )
      35            0 :         })?;
      36              : 
      37            0 :     match resp.status() {
      38            0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      39            0 :             Ok(spec_resp) => Ok(spec_resp),
      40            0 :             Err(e) => Err((
      41            0 :                 true,
      42            0 :                 format!("could not deserialize control plane response: {}", e),
      43            0 :             )),
      44              :         },
      45              :         StatusCode::SERVICE_UNAVAILABLE => {
      46            0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      47              :         }
      48              :         StatusCode::BAD_GATEWAY => {
      49              :             // We have a problem with intermittent 502 errors now
      50              :             // https://github.com/neondatabase/cloud/issues/2353
      51              :             // It's fine to retry GET request in this case.
      52            0 :             Err((true, "control plane request failed with 502".to_string()))
      53              :         }
      54              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      55              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      56            0 :         _ => Err((
      57            0 :             false,
      58            0 :             format!(
      59            0 :                 "unexpected control plane response status code: {}",
      60            0 :                 resp.status()
      61            0 :             ),
      62            0 :         )),
      63              :     }
      64            0 : }
      65              : 
      66              : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      67              : /// env variable is set, it will be used for authorization.
      68            0 : pub fn get_spec_from_control_plane(
      69            0 :     base_uri: &str,
      70            0 :     compute_id: &str,
      71            0 : ) -> Result<Option<ComputeSpec>> {
      72            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      73            0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      74            0 :         Ok(v) => v,
      75            0 :         Err(_) => "".to_string(),
      76              :     };
      77            0 :     let mut attempt = 1;
      78            0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      79            0 : 
      80            0 :     info!("getting spec from control plane: {}", cp_uri);
      81              : 
      82              :     // Do 3 attempts to get spec from the control plane using the following logic:
      83              :     // - network error -> then retry
      84              :     // - compute id is unknown or any other error -> bail out
      85              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      86              :     // - got spec -> return Ok(Some(spec))
      87            0 :     while attempt < 4 {
      88            0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      89            0 :             Ok(spec_resp) => match spec_resp.status {
      90            0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      91              :                 ControlPlaneComputeStatus::Attached => {
      92            0 :                     if let Some(spec) = spec_resp.spec {
      93            0 :                         Ok(Some(spec))
      94              :                     } else {
      95            0 :                         bail!("compute is attached, but spec is empty")
      96              :                     }
      97              :                 }
      98              :             },
      99            0 :             Err((retry, msg)) => {
     100            0 :                 if retry {
     101            0 :                     Err(anyhow!(msg))
     102              :                 } else {
     103            0 :                     bail!(msg);
     104              :                 }
     105              :             }
     106              :         };
     107              : 
     108            0 :         if let Err(e) = &spec {
     109            0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     110              :         } else {
     111            0 :             return spec;
     112              :         }
     113              : 
     114            0 :         attempt += 1;
     115            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     116              :     }
     117              : 
     118              :     // All attempts failed, return error.
     119            0 :     spec
     120            0 : }
     121              : 
     122              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     123          575 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     124          575 :     // XXX: consider making it a part of spec.json
     125          575 :     info!("checking pg_hba.conf");
     126          575 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     127          575 : 
     128          575 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     129          575 :         info!("updated pg_hba.conf to allow external connections");
     130              :     } else {
     131            0 :         info!("pg_hba.conf is up-to-date");
     132              :     }
     133              : 
     134          575 :     Ok(())
     135          575 : }
     136              : 
     137              : /// Create a standby.signal file
     138           49 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     139           49 :     // XXX: consider making it a part of spec.json
     140           49 :     info!("adding standby.signal");
     141           49 :     let signalfile = pgdata_path.join("standby.signal");
     142           49 : 
     143           49 :     if !signalfile.exists() {
     144           49 :         info!("created standby.signal");
     145           49 :         File::create(signalfile)?;
     146              :     } else {
     147            0 :         info!("reused pre-existing standby.signal");
     148              :     }
     149           49 :     Ok(())
     150           49 : }
     151              : 
     152              : /// Compute could be unexpectedly shut down, for example, during the
     153              : /// database dropping. This leaves the database in the invalid state,
     154              : /// which prevents new db creation with the same name. This function
     155              : /// will clean it up before proceeding with catalog updates. All
     156              : /// possible future cleanup operations may go here too.
     157          229 : #[instrument(skip_all)]
     158              : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     159              :     let existing_dbs = get_existing_dbs(client)?;
     160              : 
     161              :     for (_, db) in existing_dbs {
     162              :         if db.invalid {
     163              :             // After recent commit in Postgres, interrupted DROP DATABASE
     164              :             // leaves the database in the invalid state. According to the
     165              :             // commit message, the only option for user is to drop it again.
     166              :             // See:
     167              :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     168              :             //
     169              :             // Postgres Neon extension is done the way, that db is de-registered
     170              :             // in the control plane metadata only after it is dropped. So there is
     171              :             // a chance that it still thinks that db should exist. This means
     172              :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     173              :             // as user can just repeat drop (in vanilla Postgres they would need
     174              :             // to do the same, btw).
     175              :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     176            1 :             info!("dropping invalid database {}", db.name);
     177              :             client.execute(query.as_str(), &[])?;
     178              :         }
     179              :     }
     180              : 
     181              :     Ok(())
     182              : }
     183              : 
     184              : /// Given a cluster spec json and open transaction it handles roles creation,
     185              : /// deletion and update.
     186          229 : #[instrument(skip_all)]
     187              : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     188              :     let mut xact = client.transaction()?;
     189              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     190              : 
     191              :     // Print a list of existing Postgres roles (only in debug mode)
     192          229 :     if span_enabled!(Level::INFO) {
     193              :         let mut vec = Vec::new();
     194              :         for r in &existing_roles {
     195              :             vec.push(format!(
     196              :                 "{}:{}",
     197              :                 r.name,
     198              :                 if r.encrypted_password.is_some() {
     199              :                     "[FILTERED]"
     200              :                 } else {
     201              :                     "(null)"
     202              :                 }
     203              :             ));
     204              :         }
     205              : 
     206          229 :         info!("postgres roles (total {}): {:?}", vec.len(), vec);
     207              :     }
     208              : 
     209              :     // Process delta operations first
     210              :     if let Some(ops) = &spec.delta_operations {
     211            0 :         info!("processing role renames");
     212              :         for op in ops {
     213              :             match op.action.as_ref() {
     214              :                 "delete_role" => {
     215              :                     // no-op now, roles will be deleted at the end of configuration
     216              :                 }
     217              :                 // Renaming role drops its password, since role name is
     218              :                 // used as a salt there.  It is important that this role
     219              :                 // is recorded with a new `name` in the `roles` list.
     220              :                 // Follow up roles update will set the new password.
     221              :                 "rename_role" => {
     222              :                     let new_name = op.new_name.as_ref().unwrap();
     223              : 
     224              :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     225            0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     226              :                         let query: String = format!(
     227              :                             "ALTER ROLE {} RENAME TO {}",
     228              :                             op.name.pg_quote(),
     229              :                             new_name.pg_quote()
     230              :                         );
     231              : 
     232            0 :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     233              :                         xact.execute(query.as_str(), &[])?;
     234              :                     }
     235              :                 }
     236              :                 _ => {}
     237              :             }
     238              :         }
     239              :     }
     240              : 
     241              :     // Refresh Postgres roles info to handle possible roles renaming
     242              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     243              : 
     244          229 :     info!(
     245          229 :         "handling cluster spec roles (total {})",
     246          229 :         spec.cluster.roles.len()
     247          229 :     );
     248              :     for role in &spec.cluster.roles {
     249              :         let name = &role.name;
     250              :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     251            0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     252              : 
     253              :         enum RoleAction {
     254              :             None,
     255              :             Update,
     256              :             Create,
     257              :         }
     258              :         let action = if let Some(r) = pg_role {
     259              :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     260              :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     261              :             {
     262              :                 RoleAction::Update
     263              :             } else if let Some(pg_pwd) = &r.encrypted_password {
     264              :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     265              :                 //
     266              :                 // This is a backward compatibility hack, which comes from the times when we were using
     267              :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     268              :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     269              :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     270              :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     271              :                 // to place this compatibility layer.
     272              :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     273              :                     stripped
     274              :                 } else {
     275              :                     pg_pwd
     276              :                 };
     277              :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     278              :                     RoleAction::Update
     279              :                 } else {
     280              :                     RoleAction::None
     281              :                 }
     282              :             } else {
     283              :                 RoleAction::None
     284              :             }
     285              :         } else {
     286              :             RoleAction::Create
     287              :         };
     288              : 
     289              :         match action {
     290              :             RoleAction::None => {}
     291              :             RoleAction::Update => {
     292              :                 // This can be run on /every/ role! Not just ones created through the console.
     293              :                 // This means that if you add some funny ALTER here that adds a permission,
     294              :                 // this will get run even on user-created roles! This will result in different
     295              :                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     296              :                 // now only grants LOGIN and changes the password. Please do not allow this branch
     297              :                 // to do anything silly.
     298              :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     299              :                 query.push_str(&role.to_pg_options());
     300              :                 xact.execute(query.as_str(), &[])?;
     301              :             }
     302              :             RoleAction::Create => {
     303              :                 // This branch only runs when roles are created through the console, so it is
     304              :                 // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
     305              :                 // from neon_superuser.
     306              :                 let mut query: String = format!(
     307              :                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
     308              :                     name.pg_quote()
     309              :                 );
     310            0 :                 info!("running role create query: '{}'", &query);
     311              :                 query.push_str(&role.to_pg_options());
     312              :                 xact.execute(query.as_str(), &[])?;
     313              :             }
     314              :         }
     315              : 
     316            0 :         if span_enabled!(Level::INFO) {
     317              :             let pwd = if role.encrypted_password.is_some() {
     318              :                 "[FILTERED]"
     319              :             } else {
     320              :                 "(null)"
     321              :             };
     322              :             let action_str = match action {
     323              :                 RoleAction::None => "",
     324              :                 RoleAction::Create => " -> create",
     325              :                 RoleAction::Update => " -> update",
     326              :             };
     327            0 :             info!(" - {}:{}{}", name, pwd, action_str);
     328              :         }
     329              :     }
     330              : 
     331              :     xact.commit()?;
     332              : 
     333              :     Ok(())
     334              : }
     335              : 
     336              : /// Reassign all dependent objects and delete requested roles.
     337          229 : #[instrument(skip_all)]
     338              : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     339              :     if let Some(ops) = &spec.delta_operations {
     340              :         // First, reassign all dependent objects to db owners.
     341            0 :         info!("reassigning dependent objects of to-be-deleted roles");
     342              : 
     343              :         // Fetch existing roles. We could've exported and used `existing_roles` from
     344              :         // `handle_roles()`, but we only make this list there before creating new roles.
     345              :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     346              :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     347              :         // buffers already, so this shouldn't be a big deal.
     348              :         let mut xact = client.transaction()?;
     349              :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     350              :         xact.commit()?;
     351              : 
     352              :         for op in ops {
     353              :             // Check that role is still present in Postgres, as this could be a
     354              :             // restart with the same spec after role deletion.
     355            0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     356              :                 reassign_owned_objects(spec, connstr, &op.name)?;
     357              :             }
     358              :         }
     359              : 
     360              :         // Second, proceed with role deletions.
     361            0 :         info!("processing role deletions");
     362              :         let mut xact = client.transaction()?;
     363              :         for op in ops {
     364              :             // We do not check either role exists or not,
     365              :             // Postgres will take care of it for us
     366              :             if op.action == "delete_role" {
     367              :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     368              : 
     369            0 :                 warn!("deleting role '{}'", &op.name);
     370              :                 xact.execute(query.as_str(), &[])?;
     371              :             }
     372              :         }
     373              :         xact.commit()?;
     374              :     }
     375              : 
     376              :     Ok(())
     377              : }
     378              : 
     379            0 : fn reassign_owned_objects_in_one_db(
     380            0 :     conf: Config,
     381            0 :     role_name: &PgIdent,
     382            0 :     db_owner: &PgIdent,
     383            0 : ) -> Result<()> {
     384            0 :     let mut client = conf.connect(NoTls)?;
     385              : 
     386              :     // This will reassign all dependent objects to the db owner
     387            0 :     let reassign_query = format!(
     388            0 :         "REASSIGN OWNED BY {} TO {}",
     389            0 :         role_name.pg_quote(),
     390            0 :         db_owner.pg_quote()
     391            0 :     );
     392            0 :     info!(
     393            0 :         "reassigning objects owned by '{}' in db '{}' to '{}'",
     394            0 :         role_name,
     395            0 :         conf.get_dbname().unwrap_or(""),
     396            0 :         db_owner
     397            0 :     );
     398            0 :     client.simple_query(&reassign_query)?;
     399              : 
     400              :     // This now will only drop privileges of the role
     401            0 :     let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     402            0 :     client.simple_query(&drop_query)?;
     403            0 :     Ok(())
     404            0 : }
     405              : 
     406              : // Reassign all owned objects in all databases to the owner of the database.
     407            0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     408            0 :     for db in &spec.cluster.databases {
     409            0 :         if db.owner != *role_name {
     410            0 :             let mut conf = Config::from_str(connstr)?;
     411            0 :             conf.dbname(&db.name);
     412            0 :             reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
     413            0 :         }
     414              :     }
     415              : 
     416              :     // Also handle case when there are no databases in the spec.
     417              :     // In this case we need to reassign objects in the default database.
     418            0 :     let conf = Config::from_str(connstr)?;
     419            0 :     let db_owner = PgIdent::from_str("cloud_admin")?;
     420            0 :     reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
     421              : 
     422            0 :     Ok(())
     423            0 : }
     424              : 
     425              : /// It follows mostly the same logic as `handle_roles()` excepting that we
     426              : /// does not use an explicit transactions block, since major database operations
     427              : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     428              : /// atomicity should be enough here due to the order of operations and various checks,
     429              : /// which together provide us idempotency.
     430          229 : #[instrument(skip_all)]
     431              : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     432              :     let existing_dbs = get_existing_dbs(client)?;
     433              : 
     434              :     // Print a list of existing Postgres databases (only in debug mode)
     435          229 :     if span_enabled!(Level::INFO) {
     436              :         let mut vec = Vec::new();
     437              :         for (dbname, db) in &existing_dbs {
     438              :             vec.push(format!("{}:{}", dbname, db.owner));
     439              :         }
     440          229 :         info!("postgres databases (total {}): {:?}", vec.len(), vec);
     441              :     }
     442              : 
     443              :     // Process delta operations first
     444              :     if let Some(ops) = &spec.delta_operations {
     445            0 :         info!("processing delta operations on databases");
     446              :         for op in ops {
     447              :             match op.action.as_ref() {
     448              :                 // We do not check either DB exists or not,
     449              :                 // Postgres will take care of it for us
     450              :                 "delete_db" => {
     451              :                     // In Postgres we can't drop a database if it is a template.
     452              :                     // So we need to unset the template flag first, but it could
     453              :                     // be a retry, so we could've already dropped the database.
     454              :                     // Check that database exists first to make it idempotent.
     455              :                     let unset_template_query: String = format!(
     456              :                         "
     457              :                         DO $$
     458              :                         BEGIN
     459              :                             IF EXISTS(
     460              :                                 SELECT 1
     461              :                                 FROM pg_catalog.pg_database
     462              :                                 WHERE datname = {}
     463              :                             )
     464              :                             THEN
     465              :                             ALTER DATABASE {} is_template false;
     466              :                             END IF;
     467              :                         END
     468              :                         $$;",
     469              :                         escape_literal(&op.name),
     470              :                         &op.name.pg_quote()
     471              :                     );
     472              :                     // Use FORCE to drop database even if there are active connections.
     473              :                     // We run this from `cloud_admin`, so it should have enough privileges.
     474              :                     // NB: there could be other db states, which prevent us from dropping
     475              :                     // the database. For example, if db is used by any active subscription
     476              :                     // or replication slot.
     477              :                     // TODO: deal with it once we allow logical replication. Proper fix should
     478              :                     // involve returning an error code to the control plane, so it could
     479              :                     // figure out that this is a non-retryable error, return it to the user
     480              :                     // and fail operation permanently.
     481              :                     let drop_db_query: String = format!(
     482              :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     483              :                         &op.name.pg_quote()
     484              :                     );
     485              : 
     486            0 :                     warn!("deleting database '{}'", &op.name);
     487              :                     client.execute(unset_template_query.as_str(), &[])?;
     488              :                     client.execute(drop_db_query.as_str(), &[])?;
     489              :                 }
     490              :                 "rename_db" => {
     491              :                     let new_name = op.new_name.as_ref().unwrap();
     492              : 
     493              :                     if existing_dbs.get(&op.name).is_some() {
     494              :                         let query: String = format!(
     495              :                             "ALTER DATABASE {} RENAME TO {}",
     496              :                             op.name.pg_quote(),
     497              :                             new_name.pg_quote()
     498              :                         );
     499              : 
     500            0 :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     501              :                         client.execute(query.as_str(), &[])?;
     502              :                     }
     503              :                 }
     504              :                 _ => {}
     505              :             }
     506              :         }
     507              :     }
     508              : 
     509              :     // Refresh Postgres databases info to handle possible renames
     510              :     let existing_dbs = get_existing_dbs(client)?;
     511              : 
     512          229 :     info!(
     513          229 :         "handling cluster spec databases (total {})",
     514          229 :         spec.cluster.databases.len()
     515          229 :     );
     516              :     for db in &spec.cluster.databases {
     517              :         let name = &db.name;
     518              :         let pg_db = existing_dbs.get(name);
     519              : 
     520              :         enum DatabaseAction {
     521              :             None,
     522              :             Update,
     523              :             Create,
     524              :         }
     525              :         let action = if let Some(r) = pg_db {
     526              :             // XXX: db owner name is returned as quoted string from Postgres,
     527              :             // when quoting is needed.
     528              :             let new_owner = if r.owner.starts_with('"') {
     529              :                 db.owner.pg_quote()
     530              :             } else {
     531              :                 db.owner.clone()
     532              :             };
     533              : 
     534              :             if new_owner != r.owner {
     535              :                 // Update the owner
     536              :                 DatabaseAction::Update
     537              :             } else {
     538              :                 DatabaseAction::None
     539              :             }
     540              :         } else {
     541              :             DatabaseAction::Create
     542              :         };
     543              : 
     544              :         match action {
     545              :             DatabaseAction::None => {}
     546              :             DatabaseAction::Update => {
     547              :                 let query: String = format!(
     548              :                     "ALTER DATABASE {} OWNER TO {}",
     549              :                     name.pg_quote(),
     550              :                     db.owner.pg_quote()
     551              :                 );
     552              :                 let _guard = info_span!("executing", query).entered();
     553              :                 client.execute(query.as_str(), &[])?;
     554              :             }
     555              :             DatabaseAction::Create => {
     556              :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     557              :                 query.push_str(&db.to_pg_options());
     558              :                 let _guard = info_span!("executing", query).entered();
     559              :                 client.execute(query.as_str(), &[])?;
     560              :                 let grant_query: String = format!(
     561              :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     562              :                     name.pg_quote()
     563              :                 );
     564              :                 client.execute(grant_query.as_str(), &[])?;
     565              :             }
     566              :         };
     567              : 
     568            0 :         if span_enabled!(Level::INFO) {
     569              :             let action_str = match action {
     570              :                 DatabaseAction::None => "",
     571              :                 DatabaseAction::Create => " -> create",
     572              :                 DatabaseAction::Update => " -> update",
     573              :             };
     574            0 :             info!(" - {}:{}{}", db.name, db.owner, action_str);
     575              :         }
     576              :     }
     577              : 
     578              :     Ok(())
     579              : }
     580              : 
     581              : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     582              : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     583          229 : #[instrument(skip_all)]
     584              : pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
     585          229 :     info!("modifying database permissions");
     586              :     let existing_dbs = get_existing_dbs(client)?;
     587              : 
     588              :     // Do some per-database access adjustments. We'd better do this at db creation time,
     589              :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     590              :     // atomically.
     591              :     for db in &spec.cluster.databases {
     592              :         match existing_dbs.get(&db.name) {
     593              :             Some(pg_db) => {
     594              :                 if pg_db.restrict_conn || pg_db.invalid {
     595            0 :                     info!(
     596            0 :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     597            0 :                         db.name, pg_db.invalid, pg_db.restrict_conn
     598            0 :                     );
     599              :                     continue;
     600              :                 }
     601              :             }
     602              :             None => {
     603              :                 bail!(
     604              :                     "database {} doesn't exist in Postgres after handle_databases()",
     605              :                     db.name
     606              :                 );
     607              :             }
     608              :         }
     609              : 
     610              :         let mut conf = Config::from_str(connstr)?;
     611              :         conf.dbname(&db.name);
     612              : 
     613              :         let mut db_client = conf.connect(NoTls)?;
     614              : 
     615              :         // This will only change ownership on the schema itself, not the objects
     616              :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     617              :         // and database owner cannot do anything with it. SQL procedure ensures
     618              :         // that it won't error out if schema `public` doesn't exist.
     619              :         let alter_query = format!(
     620              :             "DO $$\n\
     621              :                 DECLARE\n\
     622              :                     schema_owner TEXT;\n\
     623              :                 BEGIN\n\
     624              :                     IF EXISTS(\n\
     625              :                         SELECT nspname\n\
     626              :                         FROM pg_catalog.pg_namespace\n\
     627              :                         WHERE nspname = 'public'\n\
     628              :                     )\n\
     629              :                     THEN\n\
     630              :                         SELECT nspowner::regrole::text\n\
     631              :                             FROM pg_catalog.pg_namespace\n\
     632              :                             WHERE nspname = 'public'\n\
     633              :                             INTO schema_owner;\n\
     634              :                 \n\
     635              :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     636              :                         THEN\n\
     637              :                             ALTER SCHEMA public OWNER TO {};\n\
     638              :                         END IF;\n\
     639              :                     END IF;\n\
     640              :                 END\n\
     641              :             $$;",
     642              :             db.owner.pg_quote()
     643              :         );
     644              :         db_client.simple_query(&alter_query)?;
     645              : 
     646              :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     647              :         // This is needed because since postgres 15 this privilege is removed by default.
     648              :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     649              :         // active users of 1 year old projects, but hopefully not, so check it and
     650              :         // remove this code if possible. The worst thing that could happen is that
     651              :         // user won't be able to use public schema in NEW databases created in the
     652              :         // very OLD project.
     653              :         let grant_query = "DO $$\n\
     654              :                 BEGIN\n\
     655              :                     IF EXISTS(\n\
     656              :                         SELECT nspname\n\
     657              :                         FROM pg_catalog.pg_namespace\n\
     658              :                         WHERE nspname = 'public'\n\
     659              :                     ) AND\n\
     660              :                     current_setting('server_version_num')::int/10000 >= 15\n\
     661              :                     THEN\n\
     662              :                         IF EXISTS(\n\
     663              :                             SELECT rolname\n\
     664              :                             FROM pg_catalog.pg_roles\n\
     665              :                             WHERE rolname = 'web_access'\n\
     666              :                         )\n\
     667              :                         THEN\n\
     668              :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     669              :                         END IF;\n\
     670              :                     END IF;\n\
     671              :                 END\n\
     672              :             $$;"
     673              :         .to_string();
     674              : 
     675            0 :         info!(
     676            0 :             "grant query for db {} : {}",
     677            0 :             &db.name,
     678            0 :             inlinify(&grant_query)
     679            0 :         );
     680              :         db_client.simple_query(&grant_query)?;
     681              :     }
     682              : 
     683              :     Ok(())
     684              : }
     685              : 
     686              : /// Create required system extensions
     687          229 : #[instrument(skip_all)]
     688              : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     689              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     690              :         if libs.contains("pg_stat_statements") {
     691              :             // Create extension only if this compute really needs it
     692              :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     693            0 :             info!("creating system extensions with query: {}", query);
     694              :             client.simple_query(query)?;
     695              :         }
     696              :     }
     697              : 
     698              :     Ok(())
     699              : }
     700              : 
     701              : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
     702          229 : #[instrument(skip_all)]
     703              : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
     704          229 :     info!("handle extension neon");
     705              : 
     706              :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
     707              :     client.simple_query(query)?;
     708              : 
     709              :     query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
     710          229 :     info!("create neon extension with query: {}", query);
     711              :     client.simple_query(query)?;
     712              : 
     713              :     query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
     714              :     client.simple_query(query)?;
     715              : 
     716              :     query = "ALTER EXTENSION neon SET SCHEMA neon";
     717          229 :     info!("alter neon extension schema with query: {}", query);
     718              :     client.simple_query(query)?;
     719              : 
     720              :     // this will be a no-op if extension is already up to date,
     721              :     // which may happen in two cases:
     722              :     // - extension was just installed
     723              :     // - extension was already installed and is up to date
     724              :     let query = "ALTER EXTENSION neon UPDATE";
     725          229 :     info!("update neon extension schema with query: {}", query);
     726              :     client.simple_query(query)?;
     727              : 
     728              :     Ok(())
     729              : }
     730              : 
     731            4 : #[instrument(skip_all)]
     732              : pub fn handle_migrations(client: &mut Client) -> Result<()> {
     733            4 :     info!("handle migrations");
     734              : 
     735              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     736              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     737              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     738              : 
     739              :     let migrations = [
     740              :         "ALTER ROLE neon_superuser BYPASSRLS",
     741              :         r#"
     742              : DO $$
     743              : DECLARE
     744              :     role_name text;
     745              : BEGIN
     746              :     FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
     747              :     LOOP
     748              :         RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
     749              :         EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
     750              :     END LOOP;
     751              : 
     752              :     FOR role_name IN SELECT rolname FROM pg_roles
     753              :         WHERE
     754              :             NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
     755              :     LOOP
     756              :         RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
     757              :         EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
     758              :     END LOOP;
     759              : END $$;
     760              : "#,
     761              :         r#"
     762              : DO $$
     763              : BEGIN
     764              :     IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
     765              :         EXECUTE 'GRANT pg_create_subscription TO neon_superuser';
     766              :     END IF;
     767              : END
     768              : $$;"#,
     769              :     ];
     770              : 
     771              :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
     772              :     client.simple_query(query)?;
     773              : 
     774              :     query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
     775              :     client.simple_query(query)?;
     776              : 
     777              :     query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
     778              :     client.simple_query(query)?;
     779              : 
     780              :     query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
     781              :     client.simple_query(query)?;
     782              : 
     783              :     query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
     784              :     client.simple_query(query)?;
     785              : 
     786              :     query = "SELECT id FROM neon_migration.migration_id";
     787              :     let row = client.query_one(query, &[])?;
     788              :     let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
     789              :     let starting_migration_id = current_migration;
     790              : 
     791              :     query = "BEGIN";
     792              :     client.simple_query(query)?;
     793              : 
     794              :     while current_migration < migrations.len() {
     795            9 :         info!("Running migration:\n{}\n", migrations[current_migration]);
     796              :         client.simple_query(migrations[current_migration])?;
     797              :         current_migration += 1;
     798              :     }
     799              :     let setval = format!(
     800              :         "UPDATE neon_migration.migration_id SET id={}",
     801              :         migrations.len()
     802              :     );
     803              :     client.simple_query(&setval)?;
     804              : 
     805              :     query = "COMMIT";
     806              :     client.simple_query(query)?;
     807              : 
     808            4 :     info!(
     809            4 :         "Ran {} migrations",
     810            4 :         (migrations.len() - starting_migration_id)
     811            4 :     );
     812              :     Ok(())
     813              : }
        

Generated by: LCOV version 2.1-beta