LCOV - code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 0.0 % 127 0
Test Date: 2024-09-20 16:14:12 Functions: 0.0 % 20 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::path::Path;
       3              : use std::str::FromStr;
       4              : 
       5              : use anyhow::{anyhow, bail, Context, Result};
       6              : use postgres::config::Config;
       7              : use postgres::{Client, NoTls};
       8              : use reqwest::StatusCode;
       9              : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      10              : 
      11              : use crate::config;
      12              : use crate::logger::inlinify;
      13              : use crate::migration::MigrationRunner;
      14              : use crate::params::PG_HBA_ALL_MD5;
      15              : use crate::pg_helpers::*;
      16              : 
      17              : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      18              : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      19              : 
      20              : // Do control plane request and return response if any. In case of error it
      21              : // returns a bool flag indicating whether it makes sense to retry the request
      22              : // and a string with error message.
      23            0 : fn do_control_plane_request(
      24            0 :     uri: &str,
      25            0 :     jwt: &str,
      26            0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      27            0 :     let resp = reqwest::blocking::Client::new()
      28            0 :         .get(uri)
      29            0 :         .header("Authorization", format!("Bearer {}", jwt))
      30            0 :         .send()
      31            0 :         .map_err(|e| {
      32            0 :             (
      33            0 :                 true,
      34            0 :                 format!("could not perform spec request to control plane: {}", e),
      35            0 :             )
      36            0 :         })?;
      37              : 
      38            0 :     match resp.status() {
      39            0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      40            0 :             Ok(spec_resp) => Ok(spec_resp),
      41            0 :             Err(e) => Err((
      42            0 :                 true,
      43            0 :                 format!("could not deserialize control plane response: {}", e),
      44            0 :             )),
      45              :         },
      46              :         StatusCode::SERVICE_UNAVAILABLE => {
      47            0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      48              :         }
      49              :         StatusCode::BAD_GATEWAY => {
      50              :             // We have a problem with intermittent 502 errors now
      51              :             // https://github.com/neondatabase/cloud/issues/2353
      52              :             // It's fine to retry GET request in this case.
      53            0 :             Err((true, "control plane request failed with 502".to_string()))
      54              :         }
      55              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      56              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      57            0 :         _ => Err((
      58            0 :             false,
      59            0 :             format!(
      60            0 :                 "unexpected control plane response status code: {}",
      61            0 :                 resp.status()
      62            0 :             ),
      63            0 :         )),
      64              :     }
      65            0 : }
      66              : 
      67              : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      68              : /// env variable is set, it will be used for authorization.
      69            0 : pub fn get_spec_from_control_plane(
      70            0 :     base_uri: &str,
      71            0 :     compute_id: &str,
      72            0 : ) -> Result<Option<ComputeSpec>> {
      73            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      74            0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      75            0 :         Ok(v) => v,
      76            0 :         Err(_) => "".to_string(),
      77              :     };
      78            0 :     let mut attempt = 1;
      79            0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      80            0 : 
      81            0 :     info!("getting spec from control plane: {}", cp_uri);
      82              : 
      83              :     // Do 3 attempts to get spec from the control plane using the following logic:
      84              :     // - network error -> then retry
      85              :     // - compute id is unknown or any other error -> bail out
      86              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      87              :     // - got spec -> return Ok(Some(spec))
      88            0 :     while attempt < 4 {
      89            0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      90            0 :             Ok(spec_resp) => match spec_resp.status {
      91            0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      92              :                 ControlPlaneComputeStatus::Attached => {
      93            0 :                     if let Some(spec) = spec_resp.spec {
      94            0 :                         Ok(Some(spec))
      95              :                     } else {
      96            0 :                         bail!("compute is attached, but spec is empty")
      97              :                     }
      98              :                 }
      99              :             },
     100            0 :             Err((retry, msg)) => {
     101            0 :                 if retry {
     102            0 :                     Err(anyhow!(msg))
     103              :                 } else {
     104            0 :                     bail!(msg);
     105              :                 }
     106              :             }
     107              :         };
     108              : 
     109            0 :         if let Err(e) = &spec {
     110            0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     111              :         } else {
     112            0 :             return spec;
     113              :         }
     114              : 
     115            0 :         attempt += 1;
     116            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     117              :     }
     118              : 
     119              :     // All attempts failed, return error.
     120            0 :     spec
     121            0 : }
     122              : 
     123              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     124            0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     125            0 :     // XXX: consider making it a part of spec.json
     126            0 :     info!("checking pg_hba.conf");
     127            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     128            0 : 
     129            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     130            0 :         info!("updated pg_hba.conf to allow external connections");
     131              :     } else {
     132            0 :         info!("pg_hba.conf is up-to-date");
     133              :     }
     134              : 
     135            0 :     Ok(())
     136            0 : }
     137              : 
     138              : /// Create a standby.signal file
     139            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     140            0 :     // XXX: consider making it a part of spec.json
     141            0 :     info!("adding standby.signal");
     142            0 :     let signalfile = pgdata_path.join("standby.signal");
     143            0 : 
     144            0 :     if !signalfile.exists() {
     145            0 :         info!("created standby.signal");
     146            0 :         File::create(signalfile)?;
     147              :     } else {
     148            0 :         info!("reused pre-existing standby.signal");
     149              :     }
     150            0 :     Ok(())
     151            0 : }
     152              : 
     153              : /// Compute could be unexpectedly shut down, for example, during the
     154              : /// database dropping. This leaves the database in the invalid state,
     155              : /// which prevents new db creation with the same name. This function
     156              : /// will clean it up before proceeding with catalog updates. All
     157              : /// possible future cleanup operations may go here too.
     158            0 : #[instrument(skip_all)]
     159              : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     160              :     let existing_dbs = get_existing_dbs(client)?;
     161              : 
     162              :     for (_, db) in existing_dbs {
     163              :         if db.invalid {
     164              :             // After recent commit in Postgres, interrupted DROP DATABASE
     165              :             // leaves the database in the invalid state. According to the
     166              :             // commit message, the only option for user is to drop it again.
     167              :             // See:
     168              :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     169              :             //
     170              :             // Postgres Neon extension is done the way, that db is de-registered
     171              :             // in the control plane metadata only after it is dropped. So there is
     172              :             // a chance that it still thinks that db should exist. This means
     173              :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     174              :             // as user can just repeat drop (in vanilla Postgres they would need
     175              :             // to do the same, btw).
     176              :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     177              :             info!("dropping invalid database {}", db.name);
     178              :             client.execute(query.as_str(), &[])?;
     179              :         }
     180              :     }
     181              : 
     182              :     Ok(())
     183              : }
     184              : 
     185              : /// Given a cluster spec json and open transaction it handles roles creation,
     186              : /// deletion and update.
     187            0 : #[instrument(skip_all)]
     188              : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     189              :     let mut xact = client.transaction()?;
     190              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     191              : 
     192              :     // Print a list of existing Postgres roles (only in debug mode)
     193              :     if span_enabled!(Level::INFO) {
     194              :         let mut vec = Vec::new();
     195              :         for r in &existing_roles {
     196              :             vec.push(format!(
     197              :                 "{}:{}",
     198              :                 r.name,
     199              :                 if r.encrypted_password.is_some() {
     200              :                     "[FILTERED]"
     201              :                 } else {
     202              :                     "(null)"
     203              :                 }
     204              :             ));
     205              :         }
     206              : 
     207              :         info!("postgres roles (total {}): {:?}", vec.len(), vec);
     208              :     }
     209              : 
     210              :     // Process delta operations first
     211              :     if let Some(ops) = &spec.delta_operations {
     212              :         info!("processing role renames");
     213              :         for op in ops {
     214              :             match op.action.as_ref() {
     215              :                 "delete_role" => {
     216              :                     // no-op now, roles will be deleted at the end of configuration
     217              :                 }
     218              :                 // Renaming role drops its password, since role name is
     219              :                 // used as a salt there.  It is important that this role
     220              :                 // is recorded with a new `name` in the `roles` list.
     221              :                 // Follow up roles update will set the new password.
     222              :                 "rename_role" => {
     223              :                     let new_name = op.new_name.as_ref().unwrap();
     224              : 
     225              :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     226            0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     227              :                         let query: String = format!(
     228              :                             "ALTER ROLE {} RENAME TO {}",
     229              :                             op.name.pg_quote(),
     230              :                             new_name.pg_quote()
     231              :                         );
     232              : 
     233              :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     234              :                         xact.execute(query.as_str(), &[])?;
     235              :                     }
     236              :                 }
     237              :                 _ => {}
     238              :             }
     239              :         }
     240              :     }
     241              : 
     242              :     // Refresh Postgres roles info to handle possible roles renaming
     243              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     244              : 
     245              :     info!(
     246              :         "handling cluster spec roles (total {})",
     247              :         spec.cluster.roles.len()
     248              :     );
     249              :     for role in &spec.cluster.roles {
     250              :         let name = &role.name;
     251              :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     252            0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     253              : 
     254              :         enum RoleAction {
     255              :             None,
     256              :             Update,
     257              :             Create,
     258              :         }
     259              :         let action = if let Some(r) = pg_role {
     260              :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     261              :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     262              :             {
     263              :                 RoleAction::Update
     264              :             } else if let Some(pg_pwd) = &r.encrypted_password {
     265              :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     266              :                 //
     267              :                 // This is a backward compatibility hack, which comes from the times when we were using
     268              :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     269              :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     270              :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     271              :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     272              :                 // to place this compatibility layer.
     273              :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     274              :                     stripped
     275              :                 } else {
     276              :                     pg_pwd
     277              :                 };
     278              :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     279              :                     RoleAction::Update
     280              :                 } else {
     281              :                     RoleAction::None
     282              :                 }
     283              :             } else {
     284              :                 RoleAction::None
     285              :             }
     286              :         } else {
     287              :             RoleAction::Create
     288              :         };
     289              : 
     290              :         match action {
     291              :             RoleAction::None => {}
     292              :             RoleAction::Update => {
     293              :                 // This can be run on /every/ role! Not just ones created through the console.
     294              :                 // This means that if you add some funny ALTER here that adds a permission,
     295              :                 // this will get run even on user-created roles! This will result in different
     296              :                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     297              :                 // now only grants LOGIN and changes the password. Please do not allow this branch
     298              :                 // to do anything silly.
     299              :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     300              :                 query.push_str(&role.to_pg_options());
     301              :                 xact.execute(query.as_str(), &[])?;
     302              :             }
     303              :             RoleAction::Create => {
     304              :                 // This branch only runs when roles are created through the console, so it is
     305              :                 // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
     306              :                 // from neon_superuser.
     307              :                 let mut query: String = format!(
     308              :                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
     309              :                     name.pg_quote()
     310              :                 );
     311              :                 info!("running role create query: '{}'", &query);
     312              :                 query.push_str(&role.to_pg_options());
     313              :                 xact.execute(query.as_str(), &[])?;
     314              :             }
     315              :         }
     316              : 
     317              :         if span_enabled!(Level::INFO) {
     318              :             let pwd = if role.encrypted_password.is_some() {
     319              :                 "[FILTERED]"
     320              :             } else {
     321              :                 "(null)"
     322              :             };
     323              :             let action_str = match action {
     324              :                 RoleAction::None => "",
     325              :                 RoleAction::Create => " -> create",
     326              :                 RoleAction::Update => " -> update",
     327              :             };
     328              :             info!(" - {}:{}{}", name, pwd, action_str);
     329              :         }
     330              :     }
     331              : 
     332              :     xact.commit()?;
     333              : 
     334              :     Ok(())
     335              : }
     336              : 
     337              : /// Reassign all dependent objects and delete requested roles.
     338            0 : #[instrument(skip_all)]
     339              : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     340              :     if let Some(ops) = &spec.delta_operations {
     341              :         // First, reassign all dependent objects to db owners.
     342              :         info!("reassigning dependent objects of to-be-deleted roles");
     343              : 
     344              :         // Fetch existing roles. We could've exported and used `existing_roles` from
     345              :         // `handle_roles()`, but we only make this list there before creating new roles.
     346              :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     347              :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     348              :         // buffers already, so this shouldn't be a big deal.
     349              :         let mut xact = client.transaction()?;
     350              :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     351              :         xact.commit()?;
     352              : 
     353              :         for op in ops {
     354              :             // Check that role is still present in Postgres, as this could be a
     355              :             // restart with the same spec after role deletion.
     356            0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     357              :                 reassign_owned_objects(spec, connstr, &op.name)?;
     358              :             }
     359              :         }
     360              : 
     361              :         // Second, proceed with role deletions.
     362              :         info!("processing role deletions");
     363              :         let mut xact = client.transaction()?;
     364              :         for op in ops {
     365              :             // We do not check either role exists or not,
     366              :             // Postgres will take care of it for us
     367              :             if op.action == "delete_role" {
     368              :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     369              : 
     370              :                 warn!("deleting role '{}'", &op.name);
     371              :                 xact.execute(query.as_str(), &[])?;
     372              :             }
     373              :         }
     374              :         xact.commit()?;
     375              :     }
     376              : 
     377              :     Ok(())
     378              : }
     379              : 
     380            0 : fn reassign_owned_objects_in_one_db(
     381            0 :     conf: Config,
     382            0 :     role_name: &PgIdent,
     383            0 :     db_owner: &PgIdent,
     384            0 : ) -> Result<()> {
     385            0 :     let mut client = conf.connect(NoTls)?;
     386              : 
     387              :     // This will reassign all dependent objects to the db owner
     388            0 :     let reassign_query = format!(
     389            0 :         "REASSIGN OWNED BY {} TO {}",
     390            0 :         role_name.pg_quote(),
     391            0 :         db_owner.pg_quote()
     392            0 :     );
     393            0 :     info!(
     394            0 :         "reassigning objects owned by '{}' in db '{}' to '{}'",
     395            0 :         role_name,
     396            0 :         conf.get_dbname().unwrap_or(""),
     397              :         db_owner
     398              :     );
     399            0 :     client.simple_query(&reassign_query)?;
     400              : 
     401              :     // This now will only drop privileges of the role
     402            0 :     let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     403            0 :     client.simple_query(&drop_query)?;
     404            0 :     Ok(())
     405            0 : }
     406              : 
     407              : // Reassign all owned objects in all databases to the owner of the database.
     408            0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     409            0 :     for db in &spec.cluster.databases {
     410            0 :         if db.owner != *role_name {
     411            0 :             let mut conf = Config::from_str(connstr)?;
     412            0 :             conf.dbname(&db.name);
     413            0 :             reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
     414            0 :         }
     415              :     }
     416              : 
     417              :     // Also handle case when there are no databases in the spec.
     418              :     // In this case we need to reassign objects in the default database.
     419            0 :     let conf = Config::from_str(connstr)?;
     420            0 :     let db_owner = PgIdent::from_str("cloud_admin")?;
     421            0 :     reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
     422              : 
     423            0 :     Ok(())
     424            0 : }
     425              : 
     426              : /// It follows mostly the same logic as `handle_roles()` excepting that we
     427              : /// does not use an explicit transactions block, since major database operations
     428              : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     429              : /// atomicity should be enough here due to the order of operations and various checks,
     430              : /// which together provide us idempotency.
     431            0 : #[instrument(skip_all)]
     432              : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     433              :     let existing_dbs = get_existing_dbs(client)?;
     434              : 
     435              :     // Print a list of existing Postgres databases (only in debug mode)
     436              :     if span_enabled!(Level::INFO) {
     437              :         let mut vec = Vec::new();
     438              :         for (dbname, db) in &existing_dbs {
     439              :             vec.push(format!("{}:{}", dbname, db.owner));
     440              :         }
     441              :         info!("postgres databases (total {}): {:?}", vec.len(), vec);
     442              :     }
     443              : 
     444              :     // Process delta operations first
     445              :     if let Some(ops) = &spec.delta_operations {
     446              :         info!("processing delta operations on databases");
     447              :         for op in ops {
     448              :             match op.action.as_ref() {
     449              :                 // We do not check either DB exists or not,
     450              :                 // Postgres will take care of it for us
     451              :                 "delete_db" => {
     452              :                     // In Postgres we can't drop a database if it is a template.
     453              :                     // So we need to unset the template flag first, but it could
     454              :                     // be a retry, so we could've already dropped the database.
     455              :                     // Check that database exists first to make it idempotent.
     456              :                     let unset_template_query: String = format!(
     457              :                         "
     458              :                         DO $$
     459              :                         BEGIN
     460              :                             IF EXISTS(
     461              :                                 SELECT 1
     462              :                                 FROM pg_catalog.pg_database
     463              :                                 WHERE datname = {}
     464              :                             )
     465              :                             THEN
     466              :                             ALTER DATABASE {} is_template false;
     467              :                             END IF;
     468              :                         END
     469              :                         $$;",
     470              :                         escape_literal(&op.name),
     471              :                         &op.name.pg_quote()
     472              :                     );
     473              :                     // Use FORCE to drop database even if there are active connections.
     474              :                     // We run this from `cloud_admin`, so it should have enough privileges.
     475              :                     // NB: there could be other db states, which prevent us from dropping
     476              :                     // the database. For example, if db is used by any active subscription
     477              :                     // or replication slot.
     478              :                     // TODO: deal with it once we allow logical replication. Proper fix should
     479              :                     // involve returning an error code to the control plane, so it could
     480              :                     // figure out that this is a non-retryable error, return it to the user
     481              :                     // and fail operation permanently.
     482              :                     let drop_db_query: String = format!(
     483              :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     484              :                         &op.name.pg_quote()
     485              :                     );
     486              : 
     487              :                     warn!("deleting database '{}'", &op.name);
     488              :                     client.execute(unset_template_query.as_str(), &[])?;
     489              :                     client.execute(drop_db_query.as_str(), &[])?;
     490              :                 }
     491              :                 "rename_db" => {
     492              :                     let new_name = op.new_name.as_ref().unwrap();
     493              : 
     494              :                     if existing_dbs.contains_key(&op.name) {
     495              :                         let query: String = format!(
     496              :                             "ALTER DATABASE {} RENAME TO {}",
     497              :                             op.name.pg_quote(),
     498              :                             new_name.pg_quote()
     499              :                         );
     500              : 
     501              :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     502              :                         client.execute(query.as_str(), &[])?;
     503              :                     }
     504              :                 }
     505              :                 _ => {}
     506              :             }
     507              :         }
     508              :     }
     509              : 
     510              :     // Refresh Postgres databases info to handle possible renames
     511              :     let existing_dbs = get_existing_dbs(client)?;
     512              : 
     513              :     info!(
     514              :         "handling cluster spec databases (total {})",
     515              :         spec.cluster.databases.len()
     516              :     );
     517              :     for db in &spec.cluster.databases {
     518              :         let name = &db.name;
     519              :         let pg_db = existing_dbs.get(name);
     520              : 
     521              :         enum DatabaseAction {
     522              :             None,
     523              :             Update,
     524              :             Create,
     525              :         }
     526              :         let action = if let Some(r) = pg_db {
     527              :             // XXX: db owner name is returned as quoted string from Postgres,
     528              :             // when quoting is needed.
     529              :             let new_owner = if r.owner.starts_with('"') {
     530              :                 db.owner.pg_quote()
     531              :             } else {
     532              :                 db.owner.clone()
     533              :             };
     534              : 
     535              :             if new_owner != r.owner {
     536              :                 // Update the owner
     537              :                 DatabaseAction::Update
     538              :             } else {
     539              :                 DatabaseAction::None
     540              :             }
     541              :         } else {
     542              :             DatabaseAction::Create
     543              :         };
     544              : 
     545              :         match action {
     546              :             DatabaseAction::None => {}
     547              :             DatabaseAction::Update => {
     548              :                 let query: String = format!(
     549              :                     "ALTER DATABASE {} OWNER TO {}",
     550              :                     name.pg_quote(),
     551              :                     db.owner.pg_quote()
     552              :                 );
     553              :                 let _guard = info_span!("executing", query).entered();
     554              :                 client.execute(query.as_str(), &[])?;
     555              :             }
     556              :             DatabaseAction::Create => {
     557              :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     558              :                 query.push_str(&db.to_pg_options());
     559              :                 let _guard = info_span!("executing", query).entered();
     560              :                 client.execute(query.as_str(), &[])?;
     561              :                 let grant_query: String = format!(
     562              :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     563              :                     name.pg_quote()
     564              :                 );
     565              :                 client.execute(grant_query.as_str(), &[])?;
     566              :             }
     567              :         };
     568              : 
     569              :         if span_enabled!(Level::INFO) {
     570              :             let action_str = match action {
     571              :                 DatabaseAction::None => "",
     572              :                 DatabaseAction::Create => " -> create",
     573              :                 DatabaseAction::Update => " -> update",
     574              :             };
     575              :             info!(" - {}:{}{}", db.name, db.owner, action_str);
     576              :         }
     577              :     }
     578              : 
     579              :     Ok(())
     580              : }
     581              : 
     582              : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     583              : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     584            0 : #[instrument(skip_all)]
     585              : pub fn handle_grants(
     586              :     spec: &ComputeSpec,
     587              :     client: &mut Client,
     588              :     connstr: &str,
     589              :     enable_anon_extension: bool,
     590              : ) -> Result<()> {
     591              :     info!("modifying database permissions");
     592              :     let existing_dbs = get_existing_dbs(client)?;
     593              : 
     594              :     // Do some per-database access adjustments. We'd better do this at db creation time,
     595              :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     596              :     // atomically.
     597              :     for db in &spec.cluster.databases {
     598              :         match existing_dbs.get(&db.name) {
     599              :             Some(pg_db) => {
     600              :                 if pg_db.restrict_conn || pg_db.invalid {
     601              :                     info!(
     602              :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     603              :                         db.name, pg_db.invalid, pg_db.restrict_conn
     604              :                     );
     605              :                     continue;
     606              :                 }
     607              :             }
     608              :             None => {
     609              :                 bail!(
     610              :                     "database {} doesn't exist in Postgres after handle_databases()",
     611              :                     db.name
     612              :                 );
     613              :             }
     614              :         }
     615              : 
     616              :         let mut conf = Config::from_str(connstr)?;
     617              :         conf.dbname(&db.name);
     618              : 
     619              :         let mut db_client = conf.connect(NoTls)?;
     620              : 
     621              :         // This will only change ownership on the schema itself, not the objects
     622              :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     623              :         // and database owner cannot do anything with it. SQL procedure ensures
     624              :         // that it won't error out if schema `public` doesn't exist.
     625              :         let alter_query = format!(
     626              :             "DO $$\n\
     627              :                 DECLARE\n\
     628              :                     schema_owner TEXT;\n\
     629              :                 BEGIN\n\
     630              :                     IF EXISTS(\n\
     631              :                         SELECT nspname\n\
     632              :                         FROM pg_catalog.pg_namespace\n\
     633              :                         WHERE nspname = 'public'\n\
     634              :                     )\n\
     635              :                     THEN\n\
     636              :                         SELECT nspowner::regrole::text\n\
     637              :                             FROM pg_catalog.pg_namespace\n\
     638              :                             WHERE nspname = 'public'\n\
     639              :                             INTO schema_owner;\n\
     640              :                 \n\
     641              :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     642              :                         THEN\n\
     643              :                             ALTER SCHEMA public OWNER TO {};\n\
     644              :                         END IF;\n\
     645              :                     END IF;\n\
     646              :                 END\n\
     647              :             $$;",
     648              :             db.owner.pg_quote()
     649              :         );
     650              :         db_client.simple_query(&alter_query)?;
     651              : 
     652              :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     653              :         // This is needed because since postgres 15 this privilege is removed by default.
     654              :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     655              :         // active users of 1 year old projects, but hopefully not, so check it and
     656              :         // remove this code if possible. The worst thing that could happen is that
     657              :         // user won't be able to use public schema in NEW databases created in the
     658              :         // very OLD project.
     659              :         //
     660              :         // Also, alter default permissions so that relations created by extensions can be
     661              :         // used by neon_superuser without permission issues.
     662              :         let grant_query = "DO $$\n\
     663              :                 BEGIN\n\
     664              :                     IF EXISTS(\n\
     665              :                         SELECT nspname\n\
     666              :                         FROM pg_catalog.pg_namespace\n\
     667              :                         WHERE nspname = 'public'\n\
     668              :                     ) AND\n\
     669              :                     current_setting('server_version_num')::int/10000 >= 15\n\
     670              :                     THEN\n\
     671              :                         IF EXISTS(\n\
     672              :                             SELECT rolname\n\
     673              :                             FROM pg_catalog.pg_roles\n\
     674              :                             WHERE rolname = 'web_access'\n\
     675              :                         )\n\
     676              :                         THEN\n\
     677              :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     678              :                         END IF;\n\
     679              :                     END IF;\n\
     680              :                     IF EXISTS(\n\
     681              :                         SELECT nspname\n\
     682              :                         FROM pg_catalog.pg_namespace\n\
     683              :                         WHERE nspname = 'public'\n\
     684              :                     )\n\
     685              :                     THEN\n\
     686              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
     687              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
     688              :                     END IF;\n\
     689              :                 END\n\
     690              :             $$;"
     691              :         .to_string();
     692              : 
     693              :         info!(
     694              :             "grant query for db {} : {}",
     695              :             &db.name,
     696              :             inlinify(&grant_query)
     697              :         );
     698              :         db_client.simple_query(&grant_query)?;
     699              : 
     700              :         // it is important to run this after all grants
     701              :         if enable_anon_extension {
     702              :             handle_extension_anon(spec, &db.owner, &mut db_client, false)
     703              :                 .context("handle_grants handle_extension_anon")?;
     704              :         }
     705              :     }
     706              : 
     707              :     Ok(())
     708              : }
     709              : 
     710              : /// Create required system extensions
     711            0 : #[instrument(skip_all)]
     712              : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     713              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     714              :         if libs.contains("pg_stat_statements") {
     715              :             // Create extension only if this compute really needs it
     716              :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     717              :             info!("creating system extensions with query: {}", query);
     718              :             client.simple_query(query)?;
     719              :         }
     720              :     }
     721              : 
     722              :     Ok(())
     723              : }
     724              : 
     725              : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
     726            0 : #[instrument(skip_all)]
     727              : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
     728              :     info!("handle extension neon");
     729              : 
     730              :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
     731              :     client.simple_query(query)?;
     732              : 
     733              :     query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
     734              :     info!("create neon extension with query: {}", query);
     735              :     client.simple_query(query)?;
     736              : 
     737              :     query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
     738              :     client.simple_query(query)?;
     739              : 
     740              :     query = "ALTER EXTENSION neon SET SCHEMA neon";
     741              :     info!("alter neon extension schema with query: {}", query);
     742              :     client.simple_query(query)?;
     743              : 
     744              :     // this will be a no-op if extension is already up to date,
     745              :     // which may happen in two cases:
     746              :     // - extension was just installed
     747              :     // - extension was already installed and is up to date
     748              :     let query = "ALTER EXTENSION neon UPDATE";
     749              :     info!("update neon extension version with query: {}", query);
     750              :     if let Err(e) = client.simple_query(query) {
     751              :         error!(
     752              :             "failed to upgrade neon extension during `handle_extension_neon`: {}",
     753              :             e
     754              :         );
     755              :     }
     756              : 
     757              :     Ok(())
     758              : }
     759              : 
     760            0 : #[instrument(skip_all)]
     761              : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     762              :     info!("handle neon extension upgrade");
     763              :     let query = "ALTER EXTENSION neon UPDATE";
     764              :     info!("update neon extension version with query: {}", query);
     765              :     client.simple_query(query)?;
     766              : 
     767              :     Ok(())
     768              : }
     769              : 
     770            0 : #[instrument(skip_all)]
     771              : pub fn handle_migrations(client: &mut Client) -> Result<()> {
     772              :     info!("handle migrations");
     773              : 
     774              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     775              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     776              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     777              : 
     778              :     // Add new migrations in numerical order.
     779              :     let migrations = [
     780              :         include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
     781              :         include_str!("./migrations/0002-alter_roles.sql"),
     782              :         include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
     783              :         include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
     784              :         include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
     785              :         include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
     786              :         include_str!(
     787              :             "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
     788              :         ),
     789              :         include_str!(
     790              :             "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
     791              :         ),
     792              :         include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
     793              :         include_str!(
     794              :             "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
     795              :         ),
     796              :         include_str!(
     797              :             "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
     798              :         ),
     799              :     ];
     800              : 
     801              :     MigrationRunner::new(client, &migrations).run_migrations()?;
     802              : 
     803              :     Ok(())
     804              : }
     805              : 
     806              : /// Connect to the database as superuser and pre-create anon extension
     807              : /// if it is present in shared_preload_libraries
     808            0 : #[instrument(skip_all)]
     809              : pub fn handle_extension_anon(
     810              :     spec: &ComputeSpec,
     811              :     db_owner: &str,
     812              :     db_client: &mut Client,
     813              :     grants_only: bool,
     814              : ) -> Result<()> {
     815              :     info!("handle extension anon");
     816              : 
     817              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     818              :         if libs.contains("anon") {
     819              :             if !grants_only {
     820              :                 // check if extension is already initialized using anon.is_initialized()
     821              :                 let query = "SELECT anon.is_initialized()";
     822              :                 match db_client.query(query, &[]) {
     823              :                     Ok(rows) => {
     824              :                         if !rows.is_empty() {
     825              :                             let is_initialized: bool = rows[0].get(0);
     826              :                             if is_initialized {
     827              :                                 info!("anon extension is already initialized");
     828              :                                 return Ok(());
     829              :                             }
     830              :                         }
     831              :                     }
     832              :                     Err(e) => {
     833              :                         warn!(
     834              :                             "anon extension is_installed check failed with expected error: {}",
     835              :                             e
     836              :                         );
     837              :                     }
     838              :                 };
     839              : 
     840              :                 // Create anon extension if this compute needs it
     841              :                 // Users cannot create it themselves, because superuser is required.
     842              :                 let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
     843              :                 info!("creating anon extension with query: {}", query);
     844              :                 match db_client.query(query, &[]) {
     845              :                     Ok(_) => {}
     846              :                     Err(e) => {
     847              :                         error!("anon extension creation failed with error: {}", e);
     848              :                         return Ok(());
     849              :                     }
     850              :                 }
     851              : 
     852              :                 // check that extension is installed
     853              :                 query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     854              :                 let rows = db_client.query(query, &[])?;
     855              :                 if rows.is_empty() {
     856              :                     error!("anon extension is not installed");
     857              :                     return Ok(());
     858              :                 }
     859              : 
     860              :                 // Initialize anon extension
     861              :                 // This also requires superuser privileges, so users cannot do it themselves.
     862              :                 query = "SELECT anon.init()";
     863              :                 match db_client.query(query, &[]) {
     864              :                     Ok(_) => {}
     865              :                     Err(e) => {
     866              :                         error!("anon.init() failed with error: {}", e);
     867              :                         return Ok(());
     868              :                     }
     869              :                 }
     870              :             }
     871              : 
     872              :             // check that extension is installed, if not bail early
     873              :             let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     874              :             match db_client.query(query, &[]) {
     875              :                 Ok(rows) => {
     876              :                     if rows.is_empty() {
     877              :                         error!("anon extension is not installed");
     878              :                         return Ok(());
     879              :                     }
     880              :                 }
     881              :                 Err(e) => {
     882              :                     error!("anon extension check failed with error: {}", e);
     883              :                     return Ok(());
     884              :                 }
     885              :             };
     886              : 
     887              :             let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
     888              :             info!("granting anon extension permissions with query: {}", query);
     889              :             db_client.simple_query(&query)?;
     890              : 
     891              :             // Grant permissions to db_owner to use anon extension functions
     892              :             let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
     893              :             info!("granting anon extension permissions with query: {}", query);
     894              :             db_client.simple_query(&query)?;
     895              : 
     896              :             // This is needed, because some functions are defined as SECURITY DEFINER.
     897              :             // In Postgres SECURITY DEFINER functions are executed with the privileges
     898              :             // of the owner.
     899              :             // In anon extension this it is needed to access some GUCs, which are only accessible to
     900              :             // superuser. But we've patched postgres to allow db_owner to access them as well.
     901              :             // So we need to change owner of these functions to db_owner.
     902              :             let query = format!("
     903              :                 SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
     904              :                 from pg_proc p
     905              :                 join pg_namespace nsp ON p.pronamespace = nsp.oid
     906              :                 where nsp.nspname = 'anon';", db_owner);
     907              : 
     908              :             info!("change anon extension functions owner to db owner");
     909              :             db_client.simple_query(&query)?;
     910              : 
     911              :             //  affects views as well
     912              :             let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
     913              :             info!("granting anon extension permissions with query: {}", query);
     914              :             db_client.simple_query(&query)?;
     915              : 
     916              :             let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
     917              :             info!("granting anon extension permissions with query: {}", query);
     918              :             db_client.simple_query(&query)?;
     919              :         }
     920              :     }
     921              : 
     922              :     Ok(())
     923              : }
        

Generated by: LCOV version 2.1-beta