|             Line data    Source code 
       1              : use std::fs::File;
       2              : use std::path::Path;
       3              : use std::str::FromStr;
       4              : 
       5              : use anyhow::{anyhow, bail, Context, Result};
       6              : use postgres::config::Config;
       7              : use postgres::{Client, NoTls};
       8              : use reqwest::StatusCode;
       9              : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
      10              : 
      11              : use crate::config;
      12              : use crate::logger::inlinify;
      13              : use crate::params::PG_HBA_ALL_MD5;
      14              : use crate::pg_helpers::*;
      15              : 
      16              : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
      17              : use compute_api::spec::{ComputeSpec, PgIdent, Role};
      18              : 
      19              : // Do control plane request and return response if any. In case of error it
      20              : // returns a bool flag indicating whether it makes sense to retry the request
      21              : // and a string with error message.
      22            0 : fn do_control_plane_request(
      23            0 :     uri: &str,
      24            0 :     jwt: &str,
      25            0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
      26            0 :     let resp = reqwest::blocking::Client::new()
      27            0 :         .get(uri)
      28            0 :         .header("Authorization", format!("Bearer {}", jwt))
      29            0 :         .send()
      30            0 :         .map_err(|e| {
      31            0 :             (
      32            0 :                 true,
      33            0 :                 format!("could not perform spec request to control plane: {}", e),
      34            0 :             )
      35            0 :         })?;
      36              : 
      37            0 :     match resp.status() {
      38            0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      39            0 :             Ok(spec_resp) => Ok(spec_resp),
      40            0 :             Err(e) => Err((
      41            0 :                 true,
      42            0 :                 format!("could not deserialize control plane response: {}", e),
      43            0 :             )),
      44              :         },
      45              :         StatusCode::SERVICE_UNAVAILABLE => {
      46            0 :             Err((true, "control plane is temporarily unavailable".to_string()))
      47              :         }
      48              :         StatusCode::BAD_GATEWAY => {
      49              :             // We have a problem with intermittent 502 errors now
      50              :             // https://github.com/neondatabase/cloud/issues/2353
      51              :             // It's fine to retry GET request in this case.
      52            0 :             Err((true, "control plane request failed with 502".to_string()))
      53              :         }
      54              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      55              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      56            0 :         _ => Err((
      57            0 :             false,
      58            0 :             format!(
      59            0 :                 "unexpected control plane response status code: {}",
      60            0 :                 resp.status()
      61            0 :             ),
      62            0 :         )),
      63              :     }
      64            0 : }
      65              : 
      66              : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      67              : /// env variable is set, it will be used for authorization.
      68            0 : pub fn get_spec_from_control_plane(
      69            0 :     base_uri: &str,
      70            0 :     compute_id: &str,
      71            0 : ) -> Result<Option<ComputeSpec>> {
      72            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      73            0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      74            0 :         Ok(v) => v,
      75            0 :         Err(_) => "".to_string(),
      76              :     };
      77            0 :     let mut attempt = 1;
      78            0 :     let mut spec: Result<Option<ComputeSpec>> = Ok(None);
      79            0 : 
      80            0 :     info!("getting spec from control plane: {}", cp_uri);
      81              : 
      82              :     // Do 3 attempts to get spec from the control plane using the following logic:
      83              :     // - network error -> then retry
      84              :     // - compute id is unknown or any other error -> bail out
      85              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      86              :     // - got spec -> return Ok(Some(spec))
      87            0 :     while attempt < 4 {
      88            0 :         spec = match do_control_plane_request(&cp_uri, &jwt) {
      89            0 :             Ok(spec_resp) => match spec_resp.status {
      90            0 :                 ControlPlaneComputeStatus::Empty => Ok(None),
      91              :                 ControlPlaneComputeStatus::Attached => {
      92            0 :                     if let Some(spec) = spec_resp.spec {
      93            0 :                         Ok(Some(spec))
      94              :                     } else {
      95            0 :                         bail!("compute is attached, but spec is empty")
      96              :                     }
      97              :                 }
      98              :             },
      99            0 :             Err((retry, msg)) => {
     100            0 :                 if retry {
     101            0 :                     Err(anyhow!(msg))
     102              :                 } else {
     103            0 :                     bail!(msg);
     104              :                 }
     105              :             }
     106              :         };
     107              : 
     108            0 :         if let Err(e) = &spec {
     109            0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     110              :         } else {
     111            0 :             return spec;
     112              :         }
     113              : 
     114            0 :         attempt += 1;
     115            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     116              :     }
     117              : 
     118              :     // All attempts failed, return error.
     119            0 :     spec
     120            0 : }
     121              : 
     122              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     123            0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     124            0 :     // XXX: consider making it a part of spec.json
     125            0 :     info!("checking pg_hba.conf");
     126            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     127            0 : 
     128            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     129            0 :         info!("updated pg_hba.conf to allow external connections");
     130              :     } else {
     131            0 :         info!("pg_hba.conf is up-to-date");
     132              :     }
     133              : 
     134            0 :     Ok(())
     135            0 : }
     136              : 
     137              : /// Create a standby.signal file
     138            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     139            0 :     // XXX: consider making it a part of spec.json
     140            0 :     info!("adding standby.signal");
     141            0 :     let signalfile = pgdata_path.join("standby.signal");
     142            0 : 
     143            0 :     if !signalfile.exists() {
     144            0 :         info!("created standby.signal");
     145            0 :         File::create(signalfile)?;
     146              :     } else {
     147            0 :         info!("reused pre-existing standby.signal");
     148              :     }
     149            0 :     Ok(())
     150            0 : }
     151              : 
     152              : /// Compute could be unexpectedly shut down, for example, during the
     153              : /// database dropping. This leaves the database in the invalid state,
     154              : /// which prevents new db creation with the same name. This function
     155              : /// will clean it up before proceeding with catalog updates. All
     156              : /// possible future cleanup operations may go here too.
     157            0 : #[instrument(skip_all)]
     158              : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
     159              :     let existing_dbs = get_existing_dbs(client)?;
     160              : 
     161              :     for (_, db) in existing_dbs {
     162              :         if db.invalid {
     163              :             // After recent commit in Postgres, interrupted DROP DATABASE
     164              :             // leaves the database in the invalid state. According to the
     165              :             // commit message, the only option for user is to drop it again.
     166              :             // See:
     167              :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     168              :             //
     169              :             // Postgres Neon extension is done the way, that db is de-registered
     170              :             // in the control plane metadata only after it is dropped. So there is
     171              :             // a chance that it still thinks that db should exist. This means
     172              :             // that it will be re-created by `handle_databases()`. Yet, it's fine
     173              :             // as user can just repeat drop (in vanilla Postgres they would need
     174              :             // to do the same, btw).
     175              :             let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
     176              :             info!("dropping invalid database {}", db.name);
     177              :             client.execute(query.as_str(), &[])?;
     178              :         }
     179              :     }
     180              : 
     181              :     Ok(())
     182              : }
     183              : 
     184              : /// Given a cluster spec json and open transaction it handles roles creation,
     185              : /// deletion and update.
     186            0 : #[instrument(skip_all)]
     187              : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     188              :     let mut xact = client.transaction()?;
     189              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     190              : 
     191              :     // Print a list of existing Postgres roles (only in debug mode)
     192              :     if span_enabled!(Level::INFO) {
     193              :         let mut vec = Vec::new();
     194              :         for r in &existing_roles {
     195              :             vec.push(format!(
     196              :                 "{}:{}",
     197              :                 r.name,
     198              :                 if r.encrypted_password.is_some() {
     199              :                     "[FILTERED]"
     200              :                 } else {
     201              :                     "(null)"
     202              :                 }
     203              :             ));
     204              :         }
     205              : 
     206              :         info!("postgres roles (total {}): {:?}", vec.len(), vec);
     207              :     }
     208              : 
     209              :     // Process delta operations first
     210              :     if let Some(ops) = &spec.delta_operations {
     211              :         info!("processing role renames");
     212              :         for op in ops {
     213              :             match op.action.as_ref() {
     214              :                 "delete_role" => {
     215              :                     // no-op now, roles will be deleted at the end of configuration
     216              :                 }
     217              :                 // Renaming role drops its password, since role name is
     218              :                 // used as a salt there.  It is important that this role
     219              :                 // is recorded with a new `name` in the `roles` list.
     220              :                 // Follow up roles update will set the new password.
     221              :                 "rename_role" => {
     222              :                     let new_name = op.new_name.as_ref().unwrap();
     223              : 
     224              :                     // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     225            0 :                     if existing_roles.iter().any(|r| r.name == op.name) {
     226              :                         let query: String = format!(
     227              :                             "ALTER ROLE {} RENAME TO {}",
     228              :                             op.name.pg_quote(),
     229              :                             new_name.pg_quote()
     230              :                         );
     231              : 
     232              :                         warn!("renaming role '{}' to '{}'", op.name, new_name);
     233              :                         xact.execute(query.as_str(), &[])?;
     234              :                     }
     235              :                 }
     236              :                 _ => {}
     237              :             }
     238              :         }
     239              :     }
     240              : 
     241              :     // Refresh Postgres roles info to handle possible roles renaming
     242              :     let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     243              : 
     244              :     info!(
     245              :         "handling cluster spec roles (total {})",
     246              :         spec.cluster.roles.len()
     247              :     );
     248              :     for role in &spec.cluster.roles {
     249              :         let name = &role.name;
     250              :         // XXX: with a limited number of roles it is fine, but consider making it a HashMap
     251            0 :         let pg_role = existing_roles.iter().find(|r| r.name == *name);
     252              : 
     253              :         enum RoleAction {
     254              :             None,
     255              :             Update,
     256              :             Create,
     257              :         }
     258              :         let action = if let Some(r) = pg_role {
     259              :             if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
     260              :                 || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
     261              :             {
     262              :                 RoleAction::Update
     263              :             } else if let Some(pg_pwd) = &r.encrypted_password {
     264              :                 // Check whether password changed or not (trim 'md5' prefix first if any)
     265              :                 //
     266              :                 // This is a backward compatibility hack, which comes from the times when we were using
     267              :                 // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
     268              :                 // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
     269              :                 // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
     270              :                 // Here is the only place so far where we compare hashes, so it seems to be the best candidate
     271              :                 // to place this compatibility layer.
     272              :                 let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
     273              :                     stripped
     274              :                 } else {
     275              :                     pg_pwd
     276              :                 };
     277              :                 if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
     278              :                     RoleAction::Update
     279              :                 } else {
     280              :                     RoleAction::None
     281              :                 }
     282              :             } else {
     283              :                 RoleAction::None
     284              :             }
     285              :         } else {
     286              :             RoleAction::Create
     287              :         };
     288              : 
     289              :         match action {
     290              :             RoleAction::None => {}
     291              :             RoleAction::Update => {
     292              :                 // This can be run on /every/ role! Not just ones created through the console.
     293              :                 // This means that if you add some funny ALTER here that adds a permission,
     294              :                 // this will get run even on user-created roles! This will result in different
     295              :                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     296              :                 // now only grants LOGIN and changes the password. Please do not allow this branch
     297              :                 // to do anything silly.
     298              :                 let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
     299              :                 query.push_str(&role.to_pg_options());
     300              :                 xact.execute(query.as_str(), &[])?;
     301              :             }
     302              :             RoleAction::Create => {
     303              :                 // This branch only runs when roles are created through the console, so it is
     304              :                 // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
     305              :                 // from neon_superuser.
     306              :                 let mut query: String = format!(
     307              :                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
     308              :                     name.pg_quote()
     309              :                 );
     310              :                 info!("running role create query: '{}'", &query);
     311              :                 query.push_str(&role.to_pg_options());
     312              :                 xact.execute(query.as_str(), &[])?;
     313              :             }
     314              :         }
     315              : 
     316              :         if span_enabled!(Level::INFO) {
     317              :             let pwd = if role.encrypted_password.is_some() {
     318              :                 "[FILTERED]"
     319              :             } else {
     320              :                 "(null)"
     321              :             };
     322              :             let action_str = match action {
     323              :                 RoleAction::None => "",
     324              :                 RoleAction::Create => " -> create",
     325              :                 RoleAction::Update => " -> update",
     326              :             };
     327              :             info!(" - {}:{}{}", name, pwd, action_str);
     328              :         }
     329              :     }
     330              : 
     331              :     xact.commit()?;
     332              : 
     333              :     Ok(())
     334              : }
     335              : 
     336              : /// Reassign all dependent objects and delete requested roles.
     337            0 : #[instrument(skip_all)]
     338              : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
     339              :     if let Some(ops) = &spec.delta_operations {
     340              :         // First, reassign all dependent objects to db owners.
     341              :         info!("reassigning dependent objects of to-be-deleted roles");
     342              : 
     343              :         // Fetch existing roles. We could've exported and used `existing_roles` from
     344              :         // `handle_roles()`, but we only make this list there before creating new roles.
     345              :         // Which is probably fine as we never create to-be-deleted roles, but that'd
     346              :         // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
     347              :         // buffers already, so this shouldn't be a big deal.
     348              :         let mut xact = client.transaction()?;
     349              :         let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
     350              :         xact.commit()?;
     351              : 
     352              :         for op in ops {
     353              :             // Check that role is still present in Postgres, as this could be a
     354              :             // restart with the same spec after role deletion.
     355            0 :             if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
     356              :                 reassign_owned_objects(spec, connstr, &op.name)?;
     357              :             }
     358              :         }
     359              : 
     360              :         // Second, proceed with role deletions.
     361              :         info!("processing role deletions");
     362              :         let mut xact = client.transaction()?;
     363              :         for op in ops {
     364              :             // We do not check either role exists or not,
     365              :             // Postgres will take care of it for us
     366              :             if op.action == "delete_role" {
     367              :                 let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
     368              : 
     369              :                 warn!("deleting role '{}'", &op.name);
     370              :                 xact.execute(query.as_str(), &[])?;
     371              :             }
     372              :         }
     373              :         xact.commit()?;
     374              :     }
     375              : 
     376              :     Ok(())
     377              : }
     378              : 
     379            0 : fn reassign_owned_objects_in_one_db(
     380            0 :     conf: Config,
     381            0 :     role_name: &PgIdent,
     382            0 :     db_owner: &PgIdent,
     383            0 : ) -> Result<()> {
     384            0 :     let mut client = conf.connect(NoTls)?;
     385              : 
     386              :     // This will reassign all dependent objects to the db owner
     387            0 :     let reassign_query = format!(
     388            0 :         "REASSIGN OWNED BY {} TO {}",
     389            0 :         role_name.pg_quote(),
     390            0 :         db_owner.pg_quote()
     391            0 :     );
     392            0 :     info!(
     393            0 :         "reassigning objects owned by '{}' in db '{}' to '{}'",
     394            0 :         role_name,
     395            0 :         conf.get_dbname().unwrap_or(""),
     396              :         db_owner
     397              :     );
     398            0 :     client.simple_query(&reassign_query)?;
     399              : 
     400              :     // This now will only drop privileges of the role
     401            0 :     let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
     402            0 :     client.simple_query(&drop_query)?;
     403            0 :     Ok(())
     404            0 : }
     405              : 
     406              : // Reassign all owned objects in all databases to the owner of the database.
     407            0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
     408            0 :     for db in &spec.cluster.databases {
     409            0 :         if db.owner != *role_name {
     410            0 :             let mut conf = Config::from_str(connstr)?;
     411            0 :             conf.dbname(&db.name);
     412            0 :             reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
     413            0 :         }
     414              :     }
     415              : 
     416              :     // Also handle case when there are no databases in the spec.
     417              :     // In this case we need to reassign objects in the default database.
     418            0 :     let conf = Config::from_str(connstr)?;
     419            0 :     let db_owner = PgIdent::from_str("cloud_admin")?;
     420            0 :     reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
     421              : 
     422            0 :     Ok(())
     423            0 : }
     424              : 
     425              : /// It follows mostly the same logic as `handle_roles()` excepting that we
     426              : /// does not use an explicit transactions block, since major database operations
     427              : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
     428              : /// atomicity should be enough here due to the order of operations and various checks,
     429              : /// which together provide us idempotency.
     430            0 : #[instrument(skip_all)]
     431              : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     432              :     let existing_dbs = get_existing_dbs(client)?;
     433              : 
     434              :     // Print a list of existing Postgres databases (only in debug mode)
     435              :     if span_enabled!(Level::INFO) {
     436              :         let mut vec = Vec::new();
     437              :         for (dbname, db) in &existing_dbs {
     438              :             vec.push(format!("{}:{}", dbname, db.owner));
     439              :         }
     440              :         info!("postgres databases (total {}): {:?}", vec.len(), vec);
     441              :     }
     442              : 
     443              :     // Process delta operations first
     444              :     if let Some(ops) = &spec.delta_operations {
     445              :         info!("processing delta operations on databases");
     446              :         for op in ops {
     447              :             match op.action.as_ref() {
     448              :                 // We do not check either DB exists or not,
     449              :                 // Postgres will take care of it for us
     450              :                 "delete_db" => {
     451              :                     // In Postgres we can't drop a database if it is a template.
     452              :                     // So we need to unset the template flag first, but it could
     453              :                     // be a retry, so we could've already dropped the database.
     454              :                     // Check that database exists first to make it idempotent.
     455              :                     let unset_template_query: String = format!(
     456              :                         "
     457              :                         DO $$
     458              :                         BEGIN
     459              :                             IF EXISTS(
     460              :                                 SELECT 1
     461              :                                 FROM pg_catalog.pg_database
     462              :                                 WHERE datname = {}
     463              :                             )
     464              :                             THEN
     465              :                             ALTER DATABASE {} is_template false;
     466              :                             END IF;
     467              :                         END
     468              :                         $$;",
     469              :                         escape_literal(&op.name),
     470              :                         &op.name.pg_quote()
     471              :                     );
     472              :                     // Use FORCE to drop database even if there are active connections.
     473              :                     // We run this from `cloud_admin`, so it should have enough privileges.
     474              :                     // NB: there could be other db states, which prevent us from dropping
     475              :                     // the database. For example, if db is used by any active subscription
     476              :                     // or replication slot.
     477              :                     // TODO: deal with it once we allow logical replication. Proper fix should
     478              :                     // involve returning an error code to the control plane, so it could
     479              :                     // figure out that this is a non-retryable error, return it to the user
     480              :                     // and fail operation permanently.
     481              :                     let drop_db_query: String = format!(
     482              :                         "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     483              :                         &op.name.pg_quote()
     484              :                     );
     485              : 
     486              :                     warn!("deleting database '{}'", &op.name);
     487              :                     client.execute(unset_template_query.as_str(), &[])?;
     488              :                     client.execute(drop_db_query.as_str(), &[])?;
     489              :                 }
     490              :                 "rename_db" => {
     491              :                     let new_name = op.new_name.as_ref().unwrap();
     492              : 
     493              :                     if existing_dbs.contains_key(&op.name) {
     494              :                         let query: String = format!(
     495              :                             "ALTER DATABASE {} RENAME TO {}",
     496              :                             op.name.pg_quote(),
     497              :                             new_name.pg_quote()
     498              :                         );
     499              : 
     500              :                         warn!("renaming database '{}' to '{}'", op.name, new_name);
     501              :                         client.execute(query.as_str(), &[])?;
     502              :                     }
     503              :                 }
     504              :                 _ => {}
     505              :             }
     506              :         }
     507              :     }
     508              : 
     509              :     // Refresh Postgres databases info to handle possible renames
     510              :     let existing_dbs = get_existing_dbs(client)?;
     511              : 
     512              :     info!(
     513              :         "handling cluster spec databases (total {})",
     514              :         spec.cluster.databases.len()
     515              :     );
     516              :     for db in &spec.cluster.databases {
     517              :         let name = &db.name;
     518              :         let pg_db = existing_dbs.get(name);
     519              : 
     520              :         enum DatabaseAction {
     521              :             None,
     522              :             Update,
     523              :             Create,
     524              :         }
     525              :         let action = if let Some(r) = pg_db {
     526              :             // XXX: db owner name is returned as quoted string from Postgres,
     527              :             // when quoting is needed.
     528              :             let new_owner = if r.owner.starts_with('"') {
     529              :                 db.owner.pg_quote()
     530              :             } else {
     531              :                 db.owner.clone()
     532              :             };
     533              : 
     534              :             if new_owner != r.owner {
     535              :                 // Update the owner
     536              :                 DatabaseAction::Update
     537              :             } else {
     538              :                 DatabaseAction::None
     539              :             }
     540              :         } else {
     541              :             DatabaseAction::Create
     542              :         };
     543              : 
     544              :         match action {
     545              :             DatabaseAction::None => {}
     546              :             DatabaseAction::Update => {
     547              :                 let query: String = format!(
     548              :                     "ALTER DATABASE {} OWNER TO {}",
     549              :                     name.pg_quote(),
     550              :                     db.owner.pg_quote()
     551              :                 );
     552              :                 let _guard = info_span!("executing", query).entered();
     553              :                 client.execute(query.as_str(), &[])?;
     554              :             }
     555              :             DatabaseAction::Create => {
     556              :                 let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
     557              :                 query.push_str(&db.to_pg_options());
     558              :                 let _guard = info_span!("executing", query).entered();
     559              :                 client.execute(query.as_str(), &[])?;
     560              :                 let grant_query: String = format!(
     561              :                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     562              :                     name.pg_quote()
     563              :                 );
     564              :                 client.execute(grant_query.as_str(), &[])?;
     565              :             }
     566              :         };
     567              : 
     568              :         if span_enabled!(Level::INFO) {
     569              :             let action_str = match action {
     570              :                 DatabaseAction::None => "",
     571              :                 DatabaseAction::Create => " -> create",
     572              :                 DatabaseAction::Update => " -> update",
     573              :             };
     574              :             info!(" - {}:{}{}", db.name, db.owner, action_str);
     575              :         }
     576              :     }
     577              : 
     578              :     Ok(())
     579              : }
     580              : 
     581              : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
     582              : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
     583            0 : #[instrument(skip_all)]
     584              : pub fn handle_grants(
     585              :     spec: &ComputeSpec,
     586              :     client: &mut Client,
     587              :     connstr: &str,
     588              :     enable_anon_extension: bool,
     589              : ) -> Result<()> {
     590              :     info!("modifying database permissions");
     591              :     let existing_dbs = get_existing_dbs(client)?;
     592              : 
     593              :     // Do some per-database access adjustments. We'd better do this at db creation time,
     594              :     // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
     595              :     // atomically.
     596              :     for db in &spec.cluster.databases {
     597              :         match existing_dbs.get(&db.name) {
     598              :             Some(pg_db) => {
     599              :                 if pg_db.restrict_conn || pg_db.invalid {
     600              :                     info!(
     601              :                         "skipping grants for db {} (invalid: {}, connections not allowed: {})",
     602              :                         db.name, pg_db.invalid, pg_db.restrict_conn
     603              :                     );
     604              :                     continue;
     605              :                 }
     606              :             }
     607              :             None => {
     608              :                 bail!(
     609              :                     "database {} doesn't exist in Postgres after handle_databases()",
     610              :                     db.name
     611              :                 );
     612              :             }
     613              :         }
     614              : 
     615              :         let mut conf = Config::from_str(connstr)?;
     616              :         conf.dbname(&db.name);
     617              : 
     618              :         let mut db_client = conf.connect(NoTls)?;
     619              : 
     620              :         // This will only change ownership on the schema itself, not the objects
     621              :         // inside it. Without it owner of the `public` schema will be `cloud_admin`
     622              :         // and database owner cannot do anything with it. SQL procedure ensures
     623              :         // that it won't error out if schema `public` doesn't exist.
     624              :         let alter_query = format!(
     625              :             "DO $$\n\
     626              :                 DECLARE\n\
     627              :                     schema_owner TEXT;\n\
     628              :                 BEGIN\n\
     629              :                     IF EXISTS(\n\
     630              :                         SELECT nspname\n\
     631              :                         FROM pg_catalog.pg_namespace\n\
     632              :                         WHERE nspname = 'public'\n\
     633              :                     )\n\
     634              :                     THEN\n\
     635              :                         SELECT nspowner::regrole::text\n\
     636              :                             FROM pg_catalog.pg_namespace\n\
     637              :                             WHERE nspname = 'public'\n\
     638              :                             INTO schema_owner;\n\
     639              :                 \n\
     640              :                         IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
     641              :                         THEN\n\
     642              :                             ALTER SCHEMA public OWNER TO {};\n\
     643              :                         END IF;\n\
     644              :                     END IF;\n\
     645              :                 END\n\
     646              :             $$;",
     647              :             db.owner.pg_quote()
     648              :         );
     649              :         db_client.simple_query(&alter_query)?;
     650              : 
     651              :         // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
     652              :         // This is needed because since postgres 15 this privilege is removed by default.
     653              :         // TODO: web_access isn't created for almost 1 year. It could be that we have
     654              :         // active users of 1 year old projects, but hopefully not, so check it and
     655              :         // remove this code if possible. The worst thing that could happen is that
     656              :         // user won't be able to use public schema in NEW databases created in the
     657              :         // very OLD project.
     658              :         //
     659              :         // Also, alter default permissions so that relations created by extensions can be
     660              :         // used by neon_superuser without permission issues.
     661              :         let grant_query = "DO $$\n\
     662              :                 BEGIN\n\
     663              :                     IF EXISTS(\n\
     664              :                         SELECT nspname\n\
     665              :                         FROM pg_catalog.pg_namespace\n\
     666              :                         WHERE nspname = 'public'\n\
     667              :                     ) AND\n\
     668              :                     current_setting('server_version_num')::int/10000 >= 15\n\
     669              :                     THEN\n\
     670              :                         IF EXISTS(\n\
     671              :                             SELECT rolname\n\
     672              :                             FROM pg_catalog.pg_roles\n\
     673              :                             WHERE rolname = 'web_access'\n\
     674              :                         )\n\
     675              :                         THEN\n\
     676              :                             GRANT CREATE ON SCHEMA public TO web_access;\n\
     677              :                         END IF;\n\
     678              :                     END IF;\n\
     679              :                     IF EXISTS(\n\
     680              :                         SELECT nspname\n\
     681              :                         FROM pg_catalog.pg_namespace\n\
     682              :                         WHERE nspname = 'public'\n\
     683              :                     )\n\
     684              :                     THEN\n\
     685              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
     686              :                         ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
     687              :                     END IF;\n\
     688              :                 END\n\
     689              :             $$;"
     690              :         .to_string();
     691              : 
     692              :         info!(
     693              :             "grant query for db {} : {}",
     694              :             &db.name,
     695              :             inlinify(&grant_query)
     696              :         );
     697              :         db_client.simple_query(&grant_query)?;
     698              : 
     699              :         // it is important to run this after all grants
     700              :         if enable_anon_extension {
     701              :             handle_extension_anon(spec, &db.owner, &mut db_client, false)
     702              :                 .context("handle_grants handle_extension_anon")?;
     703              :         }
     704              :     }
     705              : 
     706              :     Ok(())
     707              : }
     708              : 
     709              : /// Create required system extensions
     710            0 : #[instrument(skip_all)]
     711              : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
     712              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     713              :         if libs.contains("pg_stat_statements") {
     714              :             // Create extension only if this compute really needs it
     715              :             let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
     716              :             info!("creating system extensions with query: {}", query);
     717              :             client.simple_query(query)?;
     718              :         }
     719              :     }
     720              : 
     721              :     Ok(())
     722              : }
     723              : 
     724              : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
     725            0 : #[instrument(skip_all)]
     726              : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
     727              :     info!("handle extension neon");
     728              : 
     729              :     let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
     730              :     client.simple_query(query)?;
     731              : 
     732              :     query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
     733              :     info!("create neon extension with query: {}", query);
     734              :     client.simple_query(query)?;
     735              : 
     736              :     query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
     737              :     client.simple_query(query)?;
     738              : 
     739              :     query = "ALTER EXTENSION neon SET SCHEMA neon";
     740              :     info!("alter neon extension schema with query: {}", query);
     741              :     client.simple_query(query)?;
     742              : 
     743              :     // this will be a no-op if extension is already up to date,
     744              :     // which may happen in two cases:
     745              :     // - extension was just installed
     746              :     // - extension was already installed and is up to date
     747              :     let query = "ALTER EXTENSION neon UPDATE";
     748              :     info!("update neon extension version with query: {}", query);
     749              :     if let Err(e) = client.simple_query(query) {
     750              :         error!(
     751              :             "failed to upgrade neon extension during `handle_extension_neon`: {}",
     752              :             e
     753              :         );
     754              :     }
     755              : 
     756              :     Ok(())
     757              : }
     758              : 
     759            0 : #[instrument(skip_all)]
     760              : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     761              :     info!("handle neon extension upgrade");
     762              :     let query = "ALTER EXTENSION neon UPDATE";
     763              :     info!("update neon extension version with query: {}", query);
     764              :     client.simple_query(query)?;
     765              : 
     766              :     Ok(())
     767              : }
     768              : 
     769            0 : #[instrument(skip_all)]
     770              : pub fn handle_migrations(client: &mut Client) -> Result<()> {
     771              :     info!("handle migrations");
     772              : 
     773              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     774              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     775              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     776              : 
     777              :     // Add new migrations in numerical order.
     778              :     let migrations = [
     779              :         include_str!("./migrations/0000-neon_superuser_bypass_rls.sql"),
     780              :         include_str!("./migrations/0001-alter_roles.sql"),
     781              :         include_str!("./migrations/0002-grant_pg_create_subscription_to_neon_superuser.sql"),
     782              :         include_str!("./migrations/0003-grant_pg_monitor_to_neon_superuser.sql"),
     783              :         include_str!("./migrations/0004-grant_all_on_tables_to_neon_superuser.sql"),
     784              :         include_str!("./migrations/0005-grant_all_on_sequences_to_neon_superuser.sql"),
     785              :         include_str!(
     786              :             "./migrations/0006-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
     787              :         ),
     788              :         include_str!(
     789              :             "./migrations/0007-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
     790              :         ),
     791              :         include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"),
     792              :     ];
     793              : 
     794            0 :     let mut func = || {
     795            0 :         let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
     796            0 :         client.simple_query(query)?;
     797              : 
     798            0 :         let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
     799            0 :         client.simple_query(query)?;
     800              : 
     801            0 :         let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
     802            0 :         client.simple_query(query)?;
     803              : 
     804            0 :         let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
     805            0 :         client.simple_query(query)?;
     806              : 
     807            0 :         let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
     808            0 :         client.simple_query(query)?;
     809            0 :         Ok::<_, anyhow::Error>(())
     810            0 :     };
     811              :     func().context("handle_migrations prepare")?;
     812              : 
     813              :     let query = "SELECT id FROM neon_migration.migration_id";
     814              :     let row = client
     815              :         .query_one(query, &[])
     816              :         .context("handle_migrations get migration_id")?;
     817              :     let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
     818              :     let starting_migration_id = current_migration;
     819              : 
     820              :     let query = "BEGIN";
     821              :     client
     822              :         .simple_query(query)
     823              :         .context("handle_migrations begin")?;
     824              : 
     825              :     while current_migration < migrations.len() {
     826              :         let migration = &migrations[current_migration];
     827              :         if migration.starts_with("-- SKIP") {
     828              :             info!("Skipping migration id={}", current_migration);
     829              :         } else {
     830              :             info!(
     831              :                 "Running migration id={}:\n{}\n",
     832              :                 current_migration, migration
     833              :             );
     834            0 :             client.simple_query(migration).with_context(|| {
     835            0 :                 format!("handle_migrations current_migration={}", current_migration)
     836            0 :             })?;
     837              :         }
     838              :         current_migration += 1;
     839              :     }
     840              :     let setval = format!(
     841              :         "UPDATE neon_migration.migration_id SET id={}",
     842              :         migrations.len()
     843              :     );
     844              :     client
     845              :         .simple_query(&setval)
     846              :         .context("handle_migrations update id")?;
     847              : 
     848              :     let query = "COMMIT";
     849              :     client
     850              :         .simple_query(query)
     851              :         .context("handle_migrations commit")?;
     852              : 
     853              :     info!(
     854              :         "Ran {} migrations",
     855              :         (migrations.len() - starting_migration_id)
     856              :     );
     857              : 
     858              :     Ok(())
     859              : }
     860              : 
     861              : /// Connect to the database as superuser and pre-create anon extension
     862              : /// if it is present in shared_preload_libraries
     863            0 : #[instrument(skip_all)]
     864              : pub fn handle_extension_anon(
     865              :     spec: &ComputeSpec,
     866              :     db_owner: &str,
     867              :     db_client: &mut Client,
     868              :     grants_only: bool,
     869              : ) -> Result<()> {
     870              :     info!("handle extension anon");
     871              : 
     872              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     873              :         if libs.contains("anon") {
     874              :             if !grants_only {
     875              :                 // check if extension is already initialized using anon.is_initialized()
     876              :                 let query = "SELECT anon.is_initialized()";
     877              :                 match db_client.query(query, &[]) {
     878              :                     Ok(rows) => {
     879              :                         if !rows.is_empty() {
     880              :                             let is_initialized: bool = rows[0].get(0);
     881              :                             if is_initialized {
     882              :                                 info!("anon extension is already initialized");
     883              :                                 return Ok(());
     884              :                             }
     885              :                         }
     886              :                     }
     887              :                     Err(e) => {
     888              :                         warn!(
     889              :                             "anon extension is_installed check failed with expected error: {}",
     890              :                             e
     891              :                         );
     892              :                     }
     893              :                 };
     894              : 
     895              :                 // Create anon extension if this compute needs it
     896              :                 // Users cannot create it themselves, because superuser is required.
     897              :                 let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
     898              :                 info!("creating anon extension with query: {}", query);
     899              :                 match db_client.query(query, &[]) {
     900              :                     Ok(_) => {}
     901              :                     Err(e) => {
     902              :                         error!("anon extension creation failed with error: {}", e);
     903              :                         return Ok(());
     904              :                     }
     905              :                 }
     906              : 
     907              :                 // check that extension is installed
     908              :                 query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     909              :                 let rows = db_client.query(query, &[])?;
     910              :                 if rows.is_empty() {
     911              :                     error!("anon extension is not installed");
     912              :                     return Ok(());
     913              :                 }
     914              : 
     915              :                 // Initialize anon extension
     916              :                 // This also requires superuser privileges, so users cannot do it themselves.
     917              :                 query = "SELECT anon.init()";
     918              :                 match db_client.query(query, &[]) {
     919              :                     Ok(_) => {}
     920              :                     Err(e) => {
     921              :                         error!("anon.init() failed with error: {}", e);
     922              :                         return Ok(());
     923              :                     }
     924              :                 }
     925              :             }
     926              : 
     927              :             // check that extension is installed, if not bail early
     928              :             let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     929              :             match db_client.query(query, &[]) {
     930              :                 Ok(rows) => {
     931              :                     if rows.is_empty() {
     932              :                         error!("anon extension is not installed");
     933              :                         return Ok(());
     934              :                     }
     935              :                 }
     936              :                 Err(e) => {
     937              :                     error!("anon extension check failed with error: {}", e);
     938              :                     return Ok(());
     939              :                 }
     940              :             };
     941              : 
     942              :             let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
     943              :             info!("granting anon extension permissions with query: {}", query);
     944              :             db_client.simple_query(&query)?;
     945              : 
     946              :             // Grant permissions to db_owner to use anon extension functions
     947              :             let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
     948              :             info!("granting anon extension permissions with query: {}", query);
     949              :             db_client.simple_query(&query)?;
     950              : 
     951              :             // This is needed, because some functions are defined as SECURITY DEFINER.
     952              :             // In Postgres SECURITY DEFINER functions are executed with the privileges
     953              :             // of the owner.
     954              :             // In anon extension this it is needed to access some GUCs, which are only accessible to
     955              :             // superuser. But we've patched postgres to allow db_owner to access them as well.
     956              :             // So we need to change owner of these functions to db_owner.
     957              :             let query = format!("
     958              :                 SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
     959              :                 from pg_proc p
     960              :                 join pg_namespace nsp ON p.pronamespace = nsp.oid
     961              :                 where nsp.nspname = 'anon';", db_owner);
     962              : 
     963              :             info!("change anon extension functions owner to db owner");
     964              :             db_client.simple_query(&query)?;
     965              : 
     966              :             //  affects views as well
     967              :             let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
     968              :             info!("granting anon extension permissions with query: {}", query);
     969              :             db_client.simple_query(&query)?;
     970              : 
     971              :             let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
     972              :             info!("granting anon extension permissions with query: {}", query);
     973              :             db_client.simple_query(&query)?;
     974              :         }
     975              :     }
     976              : 
     977              :     Ok(())
     978              : }
         |