LCOV - code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit
Test: b9d67f908f91f00e353a27440ba89f642a869959.info Lines: 0.0 % 127 0
Test Date: 2024-11-19 21:44:13 Functions: 0.0 % 20 0

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::fs::File;
       3              : use std::path::Path;
       4              : use std::str::FromStr;
       5              : 
       6              : use anyhow::{anyhow, bail, Context, Result};
       7              : use postgres::config::Config;
       8              : use postgres::{Client, NoTls};
       9              : use reqwest::StatusCode;
      10              : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      11              : 
      12              : use crate::config;
      13              : use crate::logger::inlinify;
      14              : use crate::migration::MigrationRunner;
      15              : use crate::params::PG_HBA_ALL_MD5;
      16              : use crate::pg_helpers::*;
      17              : 
      18              : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      19              : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      20              : 
      21              : // Do control plane request and return response if any. In case of error it
      22              : // returns a bool flag indicating whether it makes sense to retry the request
      23              : // and a string with error message.
      24            0 : fn do_control_plane_request(
      25            0 :     uri: &str,
      26            0 :     jwt: &str,
      27            0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      28            0 :     let resp = reqwest::blocking::Client::new()
      29            0 :         .get(uri)
      30            0 :         .header("Authorization", format!("Bearer {}", jwt))
      31            0 :         .send()
      32            0 :         .map_err(|e| {
      33            0 :             (
      34            0 :                 true,
      35            0 :                 format!("could not perform spec request to control plane: {}", e),
      36            0 :             )
      37            0 :         })?;
      38              : 
      39            0 :     match resp.status() {
      40            0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      41            0 :             Ok(spec_resp) => Ok(spec_resp),
      42            0 :             Err(e) => Err((
      43            0 :                 true,
      44            0 :                 format!("could not deserialize control plane response: {}", e),
      45            0 :             )),
      46              :         },
      47              :         StatusCode::SERVICE_UNAVAILABLE => {
      48            0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      49              :         }
      50              :         StatusCode::BAD_GATEWAY => {
      51              :             // We have a problem with intermittent 502 errors now
      52              :             // https://github.com/neondatabase/cloud/issues/2353
      53              :             // It's fine to retry GET request in this case.
      54            0 :             Err((true, "control plane request failed with 502".to_string()))
      55              :         }
      56              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      57              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      58            0 :         _ => Err((
      59            0 :             false,
      60            0 :             format!(
      61            0 :                 "unexpected control plane response status code: {}",
      62            0 :                 resp.status()
      63            0 :             ),
      64            0 :         )),
      65              :     }
      66            0 : }
      67              : 
      68              : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      69              : /// env variable is set, it will be used for authorization.
      70            0 : pub fn get_spec_from_control_plane(
      71            0 :     base_uri: &str,
      72            0 :     compute_id: &str,
      73            0 : ) -> Result<Option<ComputeSpec>> {
      74            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      75            0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      76            0 :         Ok(v) => v,
      77            0 :         Err(_) => "".to_string(),
      78              :     };
      79            0 :     let mut attempt = 1;
      80            0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      81            0 : 
      82            0 :     info!("getting spec from control plane: {}", cp_uri);
      83              : 
      84              :     // Do 3 attempts to get spec from the control plane using the following logic:
      85              :     // - network error -> then retry
      86              :     // - compute id is unknown or any other error -> bail out
      87              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      88              :     // - got spec -> return Ok(Some(spec))
      89            0 :     while attempt < 4 {
      90            0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      91            0 :             Ok(spec_resp) => match spec_resp.status {
      92            0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      93              :                 ControlPlaneComputeStatus::Attached => {
      94            0 :                     if let Some(spec) = spec_resp.spec {
      95            0 :                         Ok(Some(spec))
      96              :                     } else {
      97            0 :                         bail!("compute is attached, but spec is empty")
      98              :                     }
      99              :                 }
     100              :             },
     101            0 :             Err((retry, msg)) => {
     102            0 :                 if retry {
     103            0 :                     Err(anyhow!(msg))
     104              :                 } else {
     105            0 :                     bail!(msg);
     106              :                 }
     107              :             }
     108              :         };
     109              : 
     110            0 :         if let Err(e) = &spec {
     111            0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     112              :         } else {
     113            0 :             return spec;
     114              :         }
     115              : 
     116            0 :         attempt += 1;
     117            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     118              :     }
     119              : 
     120              :     // All attempts failed, return error.
     121            0 :     spec
     122            0 : }
     123              : 
     124              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     125            0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     126            0 :     // XXX: consider making it a part of spec.json
     127            0 :     info!("checking pg_hba.conf");
     128            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     129            0 : 
     130            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     131            0 :         info!("updated pg_hba.conf to allow external connections");
     132              :     } else {
     133            0 :         info!("pg_hba.conf is up-to-date");
     134              :     }
     135              : 
     136            0 :     Ok(())
     137            0 : }
     138              : 
     139              : /// Create a standby.signal file
     140            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     141            0 :     // XXX: consider making it a part of spec.json
     142            0 :     info!("adding standby.signal");
     143            0 :     let signalfile = pgdata_path.join("standby.signal");
     144            0 : 
     145            0 :     if !signalfile.exists() {
     146            0 :         info!("created standby.signal");
     147            0 :         File::create(signalfile)?;
     148              :     } else {
     149            0 :         info!("reused pre-existing standby.signal");
     150              :     }
     151            0 :     Ok(())
     152            0 : }
     153              : 
     154              : /// Compute could be unexpectedly shut down, for example, during the
     155              : /// database dropping. This leaves the database in the invalid state,
     156              : /// which prevents new db creation with the same name. This function
     157              : /// will clean it up before proceeding with catalog updates. All
     158              : /// possible future cleanup operations may go here too.
     159            0 : #[instrument(skip_all)]
     160              : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     161              :     let existing_dbs = get_existing_dbs(client)?;
     162              : 
     163              :     for (_, db) in existing_dbs {
     164              :         if db.invalid {
     165              :             // After recent commit in Postgres, interrupted DROP DATABASE
     166              :             // leaves the database in the invalid state. According to the
     167              :             // commit message, the only option for user is to drop it again.
     168              :             // See:
     169              :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     170              :             //
     171              :             // Postgres Neon extension is done the way, that db is de-registered
     172              :             // in the control plane metadata only after it is dropped. So there is
     173              :             // a chance that it still thinks that db should exist. This means
     174              :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     175              :             // as user can just repeat drop (in vanilla Postgres they would need
     176              :             // to do the same, btw).
     177              :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     178              :             info!("dropping invalid database {}", db.name);
     179              :             client.execute(query.as_str(), &[])?;
     180              :         }
     181              :     }
     182              : 
     183              :     Ok(())
     184              : }
     185              : 
     186              : /// Given a cluster spec json and open transaction it handles roles creation,
     187              : /// deletion and update.
     188            0 : #[instrument(skip_all)]
     189              : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     190              :     let mut xact = client.transaction()?;
     191              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     192              : 
     193              :     let mut jwks_roles = HashSet::new();
     194              :     if let Some(local_proxy) = &spec.local_proxy_config {
     195              :         for jwks_setting in local_proxy.jwks.iter().flatten() {
     196              :             for role_name in &jwks_setting.role_names {
     197              :                 jwks_roles.insert(role_name.clone());
     198              :             }
     199              :         }
     200              :     }
     201              : 
     202              :     // Print a list of existing Postgres roles (only in debug mode)
     203              :     if span_enabled!(Level::INFO) {
     204              :         let mut vec = Vec::new();
     205              :         for r in &existing_roles {
     206              :             vec.push(format!(
     207              :                 "{}:{}",
     208              :                 r.name,
     209              :                 if r.encrypted_password.is_some() {
     210              :                     "[FILTERED]"
     211              :                 } else {
     212              :                     "(null)"
     213              :                 }
     214              :             ));
     215              :         }
     216              : 
     217              :         info!("postgres roles (total {}): {:?}", vec.len(), vec);
     218              :     }
     219              : 
     220              :     // Process delta operations first
     221              :     if let Some(ops) = &spec.delta_operations {
     222              :         info!("processing role renames");
     223              :         for op in ops {
     224              :             match op.action.as_ref() {
     225              :                 "delete_role" => {
     226              :                     // no-op now, roles will be deleted at the end of configuration
     227              :                 }
     228              :                 // Renaming role drops its password, since role name is
     229              :                 // used as a salt there.  It is important that this role
     230              :                 // is recorded with a new `name` in the `roles` list.
     231              :                 // Follow up roles update will set the new password.
     232              :                 "rename_role" => {
     233              :                     let new_name = op.new_name.as_ref().unwrap();
     234              : 
     235              :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     236            0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     237              :                         let query: String = format!(
     238              :                             "ALTER ROLE {} RENAME TO {}",
     239              :                             op.name.pg_quote(),
     240              :                             new_name.pg_quote()
     241              :                         );
     242              : 
     243              :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     244              :                         xact.execute(query.as_str(), &[])?;
     245              :                     }
     246              :                 }
     247              :                 _ => {}
     248              :             }
     249              :         }
     250              :     }
     251              : 
     252              :     // Refresh Postgres roles info to handle possible roles renaming
     253              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     254              : 
     255              :     info!(
     256              :         "handling cluster spec roles (total {})",
     257              :         spec.cluster.roles.len()
     258              :     );
     259              :     for role in &spec.cluster.roles {
     260              :         let name = &role.name;
     261              :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     262            0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     263              : 
     264              :         enum RoleAction {
     265              :             None,
     266              :             Update,
     267              :             Create,
     268              :         }
     269              :         let action = if let Some(r) = pg_role {
     270              :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     271              :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     272              :             {
     273              :                 RoleAction::Update
     274              :             } else if let Some(pg_pwd) = &r.encrypted_password {
     275              :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     276              :                 //
     277              :                 // This is a backward compatibility hack, which comes from the times when we were using
     278              :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     279              :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     280              :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     281              :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     282              :                 // to place this compatibility layer.
     283              :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     284              :                     stripped
     285              :                 } else {
     286              :                     pg_pwd
     287              :                 };
     288              :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     289              :                     RoleAction::Update
     290              :                 } else {
     291              :                     RoleAction::None
     292              :                 }
     293              :             } else {
     294              :                 RoleAction::None
     295              :             }
     296              :         } else {
     297              :             RoleAction::Create
     298              :         };
     299              : 
     300              :         match action {
     301              :             RoleAction::None => {}
     302              :             RoleAction::Update => {
     303              :                 // This can be run on /every/ role! Not just ones created through the console.
     304              :                 // This means that if you add some funny ALTER here that adds a permission,
     305              :                 // this will get run even on user-created roles! This will result in different
     306              :                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     307              :                 // now only grants LOGIN and changes the password. Please do not allow this branch
     308              :                 // to do anything silly.
     309              :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     310              :                 query.push_str(&role.to_pg_options());
     311              :                 xact.execute(query.as_str(), &[])?;
     312              :             }
     313              :             RoleAction::Create => {
     314              :                 // This branch only runs when roles are created through the console, so it is
     315              :                 // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
     316              :                 // from neon_superuser.
     317              :                 let mut query: String = format!(
     318              :                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
     319              :                     name.pg_quote()
     320              :                 );
     321              :                 if jwks_roles.contains(name.as_str()) {
     322              :                     query = format!("CREATE ROLE {}", name.pg_quote());
     323              :                 }
     324              :                 info!("running role create query: '{}'", &query);
     325              :                 query.push_str(&role.to_pg_options());
     326              :                 xact.execute(query.as_str(), &[])?;
     327              :             }
     328              :         }
     329              : 
     330              :         if span_enabled!(Level::INFO) {
     331              :             let pwd = if role.encrypted_password.is_some() {
     332              :                 "[FILTERED]"
     333              :             } else {
     334              :                 "(null)"
     335              :             };
     336              :             let action_str = match action {
     337              :                 RoleAction::None => "",
     338              :                 RoleAction::Create => " -> create",
     339              :                 RoleAction::Update => " -> update",
     340              :             };
     341              :             info!(" - {}:{}{}", name, pwd, action_str);
     342              :         }
     343              :     }
     344              : 
     345              :     xact.commit()?;
     346              : 
     347              :     Ok(())
     348              : }
     349              : 
     350              : /// Reassign all dependent objects and delete requested roles.
     351            0 : #[instrument(skip_all)]
     352              : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     353              :     if let Some(ops) = &spec.delta_operations {
     354              :         // First, reassign all dependent objects to db owners.
     355              :         info!("reassigning dependent objects of to-be-deleted roles");
     356              : 
     357              :         // Fetch existing roles. We could've exported and used `existing_roles` from
     358              :         // `handle_roles()`, but we only make this list there before creating new roles.
     359              :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     360              :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     361              :         // buffers already, so this shouldn't be a big deal.
     362              :         let mut xact = client.transaction()?;
     363              :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     364              :         xact.commit()?;
     365              : 
     366              :         for op in ops {
     367              :             // Check that role is still present in Postgres, as this could be a
     368              :             // restart with the same spec after role deletion.
     369            0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     370              :                 reassign_owned_objects(spec, connstr, &op.name)?;
     371              :             }
     372              :         }
     373              : 
     374              :         // Second, proceed with role deletions.
     375              :         info!("processing role deletions");
     376              :         let mut xact = client.transaction()?;
     377              :         for op in ops {
     378              :             // We do not check either role exists or not,
     379              :             // Postgres will take care of it for us
     380              :             if op.action == "delete_role" {
     381              :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     382              : 
     383              :                 warn!("deleting role '{}'", &op.name);
     384              :                 xact.execute(query.as_str(), &[])?;
     385              :             }
     386              :         }
     387              :         xact.commit()?;
     388              :     }
     389              : 
     390              :     Ok(())
     391              : }
     392              : 
     393            0 : fn reassign_owned_objects_in_one_db(
     394            0 :     conf: Config,
     395            0 :     role_name: &PgIdent,
     396            0 :     db_owner: &PgIdent,
     397            0 : ) -> Result<()> {
     398            0 :     let mut client = conf.connect(NoTls)?;
     399              : 
     400              :     // This will reassign all dependent objects to the db owner
     401            0 :     let reassign_query = format!(
     402            0 :         "REASSIGN OWNED BY {} TO {}",
     403            0 :         role_name.pg_quote(),
     404            0 :         db_owner.pg_quote()
     405            0 :     );
     406            0 :     info!(
     407            0 :         "reassigning objects owned by '{}' in db '{}' to '{}'",
     408            0 :         role_name,
     409            0 :         conf.get_dbname().unwrap_or(""),
     410              :         db_owner
     411              :     );
     412            0 :     client.simple_query(&reassign_query)?;
     413              : 
     414              :     // This now will only drop privileges of the role
     415            0 :     let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     416            0 :     client.simple_query(&drop_query)?;
     417            0 :     Ok(())
     418            0 : }
     419              : 
     420              : // Reassign all owned objects in all databases to the owner of the database.
     421            0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     422            0 :     for db in &spec.cluster.databases {
     423            0 :         if db.owner != *role_name {
     424            0 :             let mut conf = Config::from_str(connstr)?;
     425            0 :             conf.dbname(&db.name);
     426            0 :             reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
     427            0 :         }
     428              :     }
     429              : 
     430              :     // Also handle case when there are no databases in the spec.
     431              :     // In this case we need to reassign objects in the default database.
     432            0 :     let conf = Config::from_str(connstr)?;
     433            0 :     let db_owner = PgIdent::from_str("cloud_admin")?;
     434            0 :     reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
     435              : 
     436            0 :     Ok(())
     437            0 : }
     438              : 
     439              : /// It follows mostly the same logic as `handle_roles()` excepting that we
     440              : /// does not use an explicit transactions block, since major database operations
     441              : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     442              : /// atomicity should be enough here due to the order of operations and various checks,
     443              : /// which together provide us idempotency.
     444            0 : #[instrument(skip_all)]
     445              : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     446              :     let existing_dbs = get_existing_dbs(client)?;
     447              : 
     448              :     // Print a list of existing Postgres databases (only in debug mode)
     449              :     if span_enabled!(Level::INFO) {
     450              :         let mut vec = Vec::new();
     451              :         for (dbname, db) in &existing_dbs {
     452              :             vec.push(format!("{}:{}", dbname, db.owner));
     453              :         }
     454              :         info!("postgres databases (total {}): {:?}", vec.len(), vec);
     455              :     }
     456              : 
     457              :     // Process delta operations first
     458              :     if let Some(ops) = &spec.delta_operations {
     459              :         info!("processing delta operations on databases");
     460              :         for op in ops {
     461              :             match op.action.as_ref() {
     462              :                 // We do not check either DB exists or not,
     463              :                 // Postgres will take care of it for us
     464              :                 "delete_db" => {
     465              :                     // In Postgres we can't drop a database if it is a template.
     466              :                     // So we need to unset the template flag first, but it could
     467              :                     // be a retry, so we could've already dropped the database.
     468              :                     // Check that database exists first to make it idempotent.
     469              :                     let unset_template_query: String = format!(
     470              :                         "
     471              :                         DO $$
     472              :                         BEGIN
     473              :                             IF EXISTS(
     474              :                                 SELECT 1
     475              :                                 FROM pg_catalog.pg_database
     476              :                                 WHERE datname = {}
     477              :                             )
     478              :                             THEN
     479              :                             ALTER DATABASE {} is_template false;
     480              :                             END IF;
     481              :                         END
     482              :                         $$;",
     483              :                         escape_literal(&op.name),
     484              :                         &op.name.pg_quote()
     485              :                     );
     486              :                     // Use FORCE to drop database even if there are active connections.
     487              :                     // We run this from `cloud_admin`, so it should have enough privileges.
     488              :                     // NB: there could be other db states, which prevent us from dropping
     489              :                     // the database. For example, if db is used by any active subscription
     490              :                     // or replication slot.
     491              :                     // TODO: deal with it once we allow logical replication. Proper fix should
     492              :                     // involve returning an error code to the control plane, so it could
     493              :                     // figure out that this is a non-retryable error, return it to the user
     494              :                     // and fail operation permanently.
     495              :                     let drop_db_query: String = format!(
     496              :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     497              :                         &op.name.pg_quote()
     498              :                     );
     499              : 
     500              :                     warn!("deleting database '{}'", &op.name);
     501              :                     client.execute(unset_template_query.as_str(), &[])?;
     502              :                     client.execute(drop_db_query.as_str(), &[])?;
     503              :                 }
     504              :                 "rename_db" => {
     505              :                     let new_name = op.new_name.as_ref().unwrap();
     506              : 
     507              :                     if existing_dbs.contains_key(&op.name) {
     508              :                         let query: String = format!(
     509              :                             "ALTER DATABASE {} RENAME TO {}",
     510              :                             op.name.pg_quote(),
     511              :                             new_name.pg_quote()
     512              :                         );
     513              : 
     514              :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     515              :                         client.execute(query.as_str(), &[])?;
     516              :                     }
     517              :                 }
     518              :                 _ => {}
     519              :             }
     520              :         }
     521              :     }
     522              : 
     523              :     // Refresh Postgres databases info to handle possible renames
     524              :     let existing_dbs = get_existing_dbs(client)?;
     525              : 
     526              :     info!(
     527              :         "handling cluster spec databases (total {})",
     528              :         spec.cluster.databases.len()
     529              :     );
     530              :     for db in &spec.cluster.databases {
     531              :         let name = &db.name;
     532              :         let pg_db = existing_dbs.get(name);
     533              : 
     534              :         enum DatabaseAction {
     535              :             None,
     536              :             Update,
     537              :             Create,
     538              :         }
     539              :         let action = if let Some(r) = pg_db {
     540              :             // XXX: db owner name is returned as quoted string from Postgres,
     541              :             // when quoting is needed.
     542              :             let new_owner = if r.owner.starts_with('"') {
     543              :                 db.owner.pg_quote()
     544              :             } else {
     545              :                 db.owner.clone()
     546              :             };
     547              : 
     548              :             if new_owner != r.owner {
     549              :                 // Update the owner
     550              :                 DatabaseAction::Update
     551              :             } else {
     552              :                 DatabaseAction::None
     553              :             }
     554              :         } else {
     555              :             DatabaseAction::Create
     556              :         };
     557              : 
     558              :         match action {
     559              :             DatabaseAction::None => {}
     560              :             DatabaseAction::Update => {
     561              :                 let query: String = format!(
     562              :                     "ALTER DATABASE {} OWNER TO {}",
     563              :                     name.pg_quote(),
     564              :                     db.owner.pg_quote()
     565              :                 );
     566              :                 let _guard = info_span!("executing", query).entered();
     567              :                 client.execute(query.as_str(), &[])?;
     568              :             }
     569              :             DatabaseAction::Create => {
     570              :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     571              :                 query.push_str(&db.to_pg_options());
     572              :                 let _guard = info_span!("executing", query).entered();
     573              :                 client.execute(query.as_str(), &[])?;
     574              :                 let grant_query: String = format!(
     575              :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     576              :                     name.pg_quote()
     577              :                 );
     578              :                 client.execute(grant_query.as_str(), &[])?;
     579              :             }
     580              :         };
     581              : 
     582              :         if span_enabled!(Level::INFO) {
     583              :             let action_str = match action {
     584              :                 DatabaseAction::None => "",
     585              :                 DatabaseAction::Create => " -> create",
     586              :                 DatabaseAction::Update => " -> update",
     587              :             };
     588              :             info!(" - {}:{}{}", db.name, db.owner, action_str);
     589              :         }
     590              :     }
     591              : 
     592              :     Ok(())
     593              : }
     594              : 
     595              : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     596              : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     597            0 : #[instrument(skip_all)]
     598              : pub fn handle_grants(
     599              :     spec: &ComputeSpec,
     600              :     client: &mut Client,
     601              :     connstr: &str,
     602              :     enable_anon_extension: bool,
     603              : ) -> Result<()> {
     604              :     info!("modifying database permissions");
     605              :     let existing_dbs = get_existing_dbs(client)?;
     606              : 
     607              :     // Do some per-database access adjustments. We'd better do this at db creation time,
     608              :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     609              :     // atomically.
     610              :     for db in &spec.cluster.databases {
     611              :         match existing_dbs.get(&db.name) {
     612              :             Some(pg_db) => {
     613              :                 if pg_db.restrict_conn || pg_db.invalid {
     614              :                     info!(
     615              :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     616              :                         db.name, pg_db.invalid, pg_db.restrict_conn
     617              :                     );
     618              :                     continue;
     619              :                 }
     620              :             }
     621              :             None => {
     622              :                 bail!(
     623              :                     "database {} doesn't exist in Postgres after handle_databases()",
     624              :                     db.name
     625              :                 );
     626              :             }
     627              :         }
     628              : 
     629              :         let mut conf = Config::from_str(connstr)?;
     630              :         conf.dbname(&db.name);
     631              : 
     632              :         let mut db_client = conf.connect(NoTls)?;
     633              : 
     634              :         // This will only change ownership on the schema itself, not the objects
     635              :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     636              :         // and database owner cannot do anything with it. SQL procedure ensures
     637              :         // that it won't error out if schema `public` doesn't exist.
     638              :         let alter_query = format!(
     639              :             "DO $$\n\
     640              :                 DECLARE\n\
     641              :                     schema_owner TEXT;\n\
     642              :                 BEGIN\n\
     643              :                     IF EXISTS(\n\
     644              :                         SELECT nspname\n\
     645              :                         FROM pg_catalog.pg_namespace\n\
     646              :                         WHERE nspname = 'public'\n\
     647              :                     )\n\
     648              :                     THEN\n\
     649              :                         SELECT nspowner::regrole::text\n\
     650              :                             FROM pg_catalog.pg_namespace\n\
     651              :                             WHERE nspname = 'public'\n\
     652              :                             INTO schema_owner;\n\
     653              :                 \n\
     654              :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     655              :                         THEN\n\
     656              :                             ALTER SCHEMA public OWNER TO {};\n\
     657              :                         END IF;\n\
     658              :                     END IF;\n\
     659              :                 END\n\
     660              :             $$;",
     661              :             db.owner.pg_quote()
     662              :         );
     663              :         db_client.simple_query(&alter_query)?;
     664              : 
     665              :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     666              :         // This is needed because since postgres 15 this privilege is removed by default.
     667              :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     668              :         // active users of 1 year old projects, but hopefully not, so check it and
     669              :         // remove this code if possible. The worst thing that could happen is that
     670              :         // user won't be able to use public schema in NEW databases created in the
     671              :         // very OLD project.
     672              :         //
     673              :         // Also, alter default permissions so that relations created by extensions can be
     674              :         // used by neon_superuser without permission issues.
     675              :         let grant_query = "DO $$\n\
     676              :                 BEGIN\n\
     677              :                     IF EXISTS(\n\
     678              :                         SELECT nspname\n\
     679              :                         FROM pg_catalog.pg_namespace\n\
     680              :                         WHERE nspname = 'public'\n\
     681              :                     ) AND\n\
     682              :                     current_setting('server_version_num')::int/10000 >= 15\n\
     683              :                     THEN\n\
     684              :                         IF EXISTS(\n\
     685              :                             SELECT rolname\n\
     686              :                             FROM pg_catalog.pg_roles\n\
     687              :                             WHERE rolname = 'web_access'\n\
     688              :                         )\n\
     689              :                         THEN\n\
     690              :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     691              :                         END IF;\n\
     692              :                     END IF;\n\
     693              :                     IF EXISTS(\n\
     694              :                         SELECT nspname\n\
     695              :                         FROM pg_catalog.pg_namespace\n\
     696              :                         WHERE nspname = 'public'\n\
     697              :                     )\n\
     698              :                     THEN\n\
     699              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
     700              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
     701              :                     END IF;\n\
     702              :                 END\n\
     703              :             $$;"
     704              :         .to_string();
     705              : 
     706              :         info!(
     707              :             "grant query for db {} : {}",
     708              :             &db.name,
     709              :             inlinify(&grant_query)
     710              :         );
     711              :         db_client.simple_query(&grant_query)?;
     712              : 
     713              :         // it is important to run this after all grants
     714              :         if enable_anon_extension {
     715              :             handle_extension_anon(spec, &db.owner, &mut db_client, false)
     716              :                 .context("handle_grants handle_extension_anon")?;
     717              :         }
     718              :     }
     719              : 
     720              :     Ok(())
     721              : }
     722              : 
     723              : /// Create required system extensions
     724            0 : #[instrument(skip_all)]
     725              : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     726              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     727              :         if libs.contains("pg_stat_statements") {
     728              :             // Create extension only if this compute really needs it
     729              :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     730              :             info!("creating system extensions with query: {}", query);
     731              :             client.simple_query(query)?;
     732              :         }
     733              :     }
     734              : 
     735              :     Ok(())
     736              : }
     737              : 
     738              : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
     739            0 : #[instrument(skip_all)]
     740              : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
     741              :     info!("handle extension neon");
     742              : 
     743              :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
     744              :     client.simple_query(query)?;
     745              : 
     746              :     query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
     747              :     info!("create neon extension with query: {}", query);
     748              :     client.simple_query(query)?;
     749              : 
     750              :     query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
     751              :     client.simple_query(query)?;
     752              : 
     753              :     query = "ALTER EXTENSION neon SET SCHEMA neon";
     754              :     info!("alter neon extension schema with query: {}", query);
     755              :     client.simple_query(query)?;
     756              : 
     757              :     // this will be a no-op if extension is already up to date,
     758              :     // which may happen in two cases:
     759              :     // - extension was just installed
     760              :     // - extension was already installed and is up to date
     761              :     let query = "ALTER EXTENSION neon UPDATE";
     762              :     info!("update neon extension version with query: {}", query);
     763              :     if let Err(e) = client.simple_query(query) {
     764              :         error!(
     765              :             "failed to upgrade neon extension during `handle_extension_neon`: {}",
     766              :             e
     767              :         );
     768              :     }
     769              : 
     770              :     Ok(())
     771              : }
     772              : 
     773            0 : #[instrument(skip_all)]
     774              : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     775              :     info!("handle neon extension upgrade");
     776              :     let query = "ALTER EXTENSION neon UPDATE";
     777              :     info!("update neon extension version with query: {}", query);
     778              :     client.simple_query(query)?;
     779              : 
     780              :     Ok(())
     781              : }
     782              : 
     783            0 : #[instrument(skip_all)]
     784              : pub fn handle_migrations(client: &mut Client) -> Result<()> {
     785              :     info!("handle migrations");
     786              : 
     787              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     788              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     789              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     790              : 
     791              :     // Add new migrations in numerical order.
     792              :     let migrations = [
     793              :         include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
     794              :         include_str!("./migrations/0002-alter_roles.sql"),
     795              :         include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
     796              :         include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
     797              :         include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
     798              :         include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
     799              :         include_str!(
     800              :             "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
     801              :         ),
     802              :         include_str!(
     803              :             "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
     804              :         ),
     805              :         include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
     806              :         include_str!(
     807              :             "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
     808              :         ),
     809              :         include_str!(
     810              :             "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
     811              :         ),
     812              :     ];
     813              : 
     814              :     MigrationRunner::new(client, &migrations).run_migrations()?;
     815              : 
     816              :     Ok(())
     817              : }
     818              : 
     819              : /// Connect to the database as superuser and pre-create anon extension
     820              : /// if it is present in shared_preload_libraries
     821            0 : #[instrument(skip_all)]
     822              : pub fn handle_extension_anon(
     823              :     spec: &ComputeSpec,
     824              :     db_owner: &str,
     825              :     db_client: &mut Client,
     826              :     grants_only: bool,
     827              : ) -> Result<()> {
     828              :     info!("handle extension anon");
     829              : 
     830              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     831              :         if libs.contains("anon") {
     832              :             if !grants_only {
     833              :                 // check if extension is already initialized using anon.is_initialized()
     834              :                 let query = "SELECT anon.is_initialized()";
     835              :                 match db_client.query(query, &[]) {
     836              :                     Ok(rows) => {
     837              :                         if !rows.is_empty() {
     838              :                             let is_initialized: bool = rows[0].get(0);
     839              :                             if is_initialized {
     840              :                                 info!("anon extension is already initialized");
     841              :                                 return Ok(());
     842              :                             }
     843              :                         }
     844              :                     }
     845              :                     Err(e) => {
     846              :                         warn!(
     847              :                             "anon extension is_installed check failed with expected error: {}",
     848              :                             e
     849              :                         );
     850              :                     }
     851              :                 };
     852              : 
     853              :                 // Create anon extension if this compute needs it
     854              :                 // Users cannot create it themselves, because superuser is required.
     855              :                 let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
     856              :                 info!("creating anon extension with query: {}", query);
     857              :                 match db_client.query(query, &[]) {
     858              :                     Ok(_) => {}
     859              :                     Err(e) => {
     860              :                         error!("anon extension creation failed with error: {}", e);
     861              :                         return Ok(());
     862              :                     }
     863              :                 }
     864              : 
     865              :                 // check that extension is installed
     866              :                 query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     867              :                 let rows = db_client.query(query, &[])?;
     868              :                 if rows.is_empty() {
     869              :                     error!("anon extension is not installed");
     870              :                     return Ok(());
     871              :                 }
     872              : 
     873              :                 // Initialize anon extension
     874              :                 // This also requires superuser privileges, so users cannot do it themselves.
     875              :                 query = "SELECT anon.init()";
     876              :                 match db_client.query(query, &[]) {
     877              :                     Ok(_) => {}
     878              :                     Err(e) => {
     879              :                         error!("anon.init() failed with error: {}", e);
     880              :                         return Ok(());
     881              :                     }
     882              :                 }
     883              :             }
     884              : 
     885              :             // check that extension is installed, if not bail early
     886              :             let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     887              :             match db_client.query(query, &[]) {
     888              :                 Ok(rows) => {
     889              :                     if rows.is_empty() {
     890              :                         error!("anon extension is not installed");
     891              :                         return Ok(());
     892              :                     }
     893              :                 }
     894              :                 Err(e) => {
     895              :                     error!("anon extension check failed with error: {}", e);
     896              :                     return Ok(());
     897              :                 }
     898              :             };
     899              : 
     900              :             let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
     901              :             info!("granting anon extension permissions with query: {}", query);
     902              :             db_client.simple_query(&query)?;
     903              : 
     904              :             // Grant permissions to db_owner to use anon extension functions
     905              :             let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
     906              :             info!("granting anon extension permissions with query: {}", query);
     907              :             db_client.simple_query(&query)?;
     908              : 
     909              :             // This is needed, because some functions are defined as SECURITY DEFINER.
     910              :             // In Postgres SECURITY DEFINER functions are executed with the privileges
     911              :             // of the owner.
     912              :             // In anon extension this it is needed to access some GUCs, which are only accessible to
     913              :             // superuser. But we've patched postgres to allow db_owner to access them as well.
     914              :             // So we need to change owner of these functions to db_owner.
     915              :             let query = format!("
     916              :                 SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
     917              :                 from pg_proc p
     918              :                 join pg_namespace nsp ON p.pronamespace = nsp.oid
     919              :                 where nsp.nspname = 'anon';", db_owner);
     920              : 
     921              :             info!("change anon extension functions owner to db owner");
     922              :             db_client.simple_query(&query)?;
     923              : 
     924              :             //  affects views as well
     925              :             let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
     926              :             info!("granting anon extension permissions with query: {}", query);
     927              :             db_client.simple_query(&query)?;
     928              : 
     929              :             let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
     930              :             info!("granting anon extension permissions with query: {}", query);
     931              :             db_client.simple_query(&query)?;
     932              :         }
     933              :     }
     934              : 
     935              :     Ok(())
     936              : }
        

Generated by: LCOV version 2.1-beta