LCOV - code coverage report
Current view: top level - compute_tools/src - spec_apply.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 542 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 33 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::fmt::{Debug, Formatter};
       3              : use std::future::Future;
       4              : use std::iter::empty;
       5              : use std::iter::once;
       6              : use std::sync::Arc;
       7              : 
       8              : use crate::compute::construct_superuser_query;
       9              : use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
      10              : use anyhow::{bail, Result};
      11              : use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
      12              : use futures::future::join_all;
      13              : use tokio::sync::RwLock;
      14              : use tokio_postgres::Client;
      15              : use tracing::{debug, info_span, Instrument};
      16              : 
      17              : #[derive(Clone)]
      18              : pub enum DB {
      19              :     SystemDB,
      20              :     UserDB(Database),
      21              : }
      22              : 
      23              : impl DB {
      24            0 :     pub fn new(db: Database) -> DB {
      25            0 :         Self::UserDB(db)
      26            0 :     }
      27              : 
      28            0 :     pub fn is_owned_by(&self, role: &PgIdent) -> bool {
      29            0 :         match self {
      30            0 :             DB::SystemDB => false,
      31            0 :             DB::UserDB(db) => &db.owner == role,
      32              :         }
      33            0 :     }
      34              : }
      35              : 
      36              : impl Debug for DB {
      37            0 :     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
      38            0 :         match self {
      39            0 :             DB::SystemDB => f.debug_tuple("SystemDB").finish(),
      40            0 :             DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
      41              :         }
      42            0 :     }
      43              : }
      44              : 
      45              : #[derive(Copy, Clone, Debug)]
      46              : pub enum PerDatabasePhase {
      47              :     DeleteDBRoleReferences,
      48              :     ChangeSchemaPerms,
      49              :     HandleAnonExtension,
      50              :     DropLogicalSubscriptions,
      51              : }
      52              : 
      53              : #[derive(Clone, Debug)]
      54              : pub enum ApplySpecPhase {
      55              :     CreateSuperUser,
      56              :     DropInvalidDatabases,
      57              :     RenameRoles,
      58              :     CreateAndAlterRoles,
      59              :     RenameAndDeleteDatabases,
      60              :     CreateAndAlterDatabases,
      61              :     CreateSchemaNeon,
      62              :     RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
      63              :     HandleOtherExtensions,
      64              :     HandleNeonExtension,
      65              :     CreateAvailabilityCheck,
      66              :     DropRoles,
      67              :     FinalizeDropLogicalSubscriptions,
      68              : }
      69              : 
      70              : pub struct Operation {
      71              :     pub query: String,
      72              :     pub comment: Option<String>,
      73              : }
      74              : 
      75              : pub struct MutableApplyContext {
      76              :     pub roles: HashMap<String, Role>,
      77              :     pub dbs: HashMap<String, Database>,
      78              : }
      79              : 
      80              : /// Apply the operations that belong to the given spec apply phase.
      81              : ///
      82              : /// Commands within a single phase are executed in order of Iterator yield.
      83              : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
      84              : /// indicated by its `db` field, and can share a single client for all changes
      85              : /// to that database.
      86              : ///
      87              : /// Notes:
      88              : /// - Commands are pipelined, and thus may cause incomplete apply if one
      89              : ///   command of many fails.
      90              : /// - Failing commands will fail the phase's apply step once the return value
      91              : ///   is processed.
      92              : /// - No timeouts have (yet) been implemented.
      93              : /// - The caller is responsible for limiting and/or applying concurrency.
      94            0 : pub async fn apply_operations<'a, Fut, F>(
      95            0 :     spec: Arc<ComputeSpec>,
      96            0 :     ctx: Arc<RwLock<MutableApplyContext>>,
      97            0 :     jwks_roles: Arc<HashSet<String>>,
      98            0 :     apply_spec_phase: ApplySpecPhase,
      99            0 :     client: F,
     100            0 : ) -> Result<()>
     101            0 : where
     102            0 :     F: FnOnce() -> Fut,
     103            0 :     Fut: Future<Output = Result<&'a Client>>,
     104            0 : {
     105            0 :     debug!("Starting phase {:?}", &apply_spec_phase);
     106            0 :     let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
     107            0 :     let span2 = span.clone();
     108            0 :     async move {
     109            0 :         debug!("Processing phase {:?}", &apply_spec_phase);
     110            0 :         let ctx = ctx;
     111              : 
     112            0 :         let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
     113            0 :             .await?
     114            0 :             .peekable();
     115            0 : 
     116            0 :         // Return (and by doing so, skip requesting the PostgreSQL client) if
     117            0 :         // we don't have any operations scheduled.
     118            0 :         if ops.peek().is_none() {
     119            0 :             return Ok(());
     120            0 :         }
     121              : 
     122            0 :         let client = client().await?;
     123              : 
     124            0 :         debug!("Applying phase {:?}", &apply_spec_phase);
     125              : 
     126            0 :         let active_queries = ops
     127            0 :             .map(|op| {
     128            0 :                 let Operation { comment, query } = op;
     129            0 :                 let inspan = match comment {
     130            0 :                     None => span.clone(),
     131            0 :                     Some(comment) => info_span!("phase {}: {}", comment),
     132              :                 };
     133              : 
     134            0 :                 async {
     135            0 :                     let query = query;
     136            0 :                     let res = client.simple_query(&query).await;
     137            0 :                     debug!(
     138            0 :                         "{} {}",
     139            0 :                         if res.is_ok() {
     140            0 :                             "successfully executed"
     141              :                         } else {
     142            0 :                             "failed to execute"
     143              :                         },
     144              :                         query
     145              :                     );
     146            0 :                     res
     147            0 :                 }
     148            0 :                 .instrument(inspan)
     149            0 :             })
     150            0 :             .collect::<Vec<_>>();
     151            0 : 
     152            0 :         drop(ctx);
     153              : 
     154            0 :         for it in join_all(active_queries).await {
     155            0 :             drop(it?);
     156              :         }
     157              : 
     158            0 :         debug!("Completed phase {:?}", &apply_spec_phase);
     159              : 
     160            0 :         Ok(())
     161            0 :     }
     162            0 :     .instrument(span2)
     163            0 :     .await
     164            0 : }
     165              : 
     166              : /// Create a stream of operations to be executed for that phase of applying
     167              : /// changes.
     168              : ///
     169              : /// In the future we may generate a single stream of changes and then
     170              : /// sort/merge/batch execution, but for now this is a nice way to improve
     171              : /// batching behaviour of the commands.
     172            0 : async fn get_operations<'a>(
     173            0 :     spec: &'a ComputeSpec,
     174            0 :     ctx: &'a RwLock<MutableApplyContext>,
     175            0 :     jwks_roles: &'a HashSet<String>,
     176            0 :     apply_spec_phase: &'a ApplySpecPhase,
     177            0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
     178            0 :     match apply_spec_phase {
     179              :         ApplySpecPhase::CreateSuperUser => {
     180            0 :             let query = construct_superuser_query(spec);
     181            0 : 
     182            0 :             Ok(Box::new(once(Operation {
     183            0 :                 query,
     184            0 :                 comment: None,
     185            0 :             })))
     186              :         }
     187              :         ApplySpecPhase::DropInvalidDatabases => {
     188            0 :             let mut ctx = ctx.write().await;
     189            0 :             let databases = &mut ctx.dbs;
     190            0 : 
     191            0 :             let keys: Vec<_> = databases
     192            0 :                 .iter()
     193            0 :                 .filter(|(_, db)| db.invalid)
     194            0 :                 .map(|(dbname, _)| dbname.clone())
     195            0 :                 .collect();
     196            0 : 
     197            0 :             // After recent commit in Postgres, interrupted DROP DATABASE
     198            0 :             // leaves the database in the invalid state. According to the
     199            0 :             // commit message, the only option for user is to drop it again.
     200            0 :             // See:
     201            0 :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     202            0 :             //
     203            0 :             // Postgres Neon extension is done the way, that db is de-registered
     204            0 :             // in the control plane metadata only after it is dropped. So there is
     205            0 :             // a chance that it still thinks that the db should exist. This means
     206            0 :             // that it will be re-created by the `CreateDatabases` phase. This
     207            0 :             // is fine, as user can just drop the table again (in vanilla
     208            0 :             // Postgres they would need to do the same).
     209            0 :             let operations = keys
     210            0 :                 .into_iter()
     211            0 :                 .filter_map(move |dbname| ctx.dbs.remove(&dbname))
     212            0 :                 .map(|db| Operation {
     213            0 :                     query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
     214            0 :                     comment: Some(format!("Dropping invalid database {}", db.name)),
     215            0 :                 });
     216            0 : 
     217            0 :             Ok(Box::new(operations))
     218              :         }
     219              :         ApplySpecPhase::RenameRoles => {
     220            0 :             let mut ctx = ctx.write().await;
     221              : 
     222            0 :             let operations = spec
     223            0 :                 .delta_operations
     224            0 :                 .iter()
     225            0 :                 .flatten()
     226            0 :                 .filter(|op| op.action == "rename_role")
     227            0 :                 .filter_map(move |op| {
     228            0 :                     let roles = &mut ctx.roles;
     229            0 : 
     230            0 :                     if roles.contains_key(op.name.as_str()) {
     231            0 :                         None
     232              :                     } else {
     233            0 :                         let new_name = op.new_name.as_ref().unwrap();
     234            0 :                         let mut role = roles.remove(op.name.as_str()).unwrap();
     235            0 : 
     236            0 :                         role.name = new_name.clone();
     237            0 :                         role.encrypted_password = None;
     238            0 :                         roles.insert(role.name.clone(), role);
     239            0 : 
     240            0 :                         Some(Operation {
     241            0 :                             query: format!(
     242            0 :                                 "ALTER ROLE {} RENAME TO {}",
     243            0 :                                 op.name.pg_quote(),
     244            0 :                                 new_name.pg_quote()
     245            0 :                             ),
     246            0 :                             comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
     247            0 :                         })
     248              :                     }
     249            0 :                 });
     250            0 : 
     251            0 :             Ok(Box::new(operations))
     252              :         }
     253              :         ApplySpecPhase::CreateAndAlterRoles => {
     254            0 :             let mut ctx = ctx.write().await;
     255              : 
     256            0 :             let operations = spec.cluster.roles
     257            0 :                 .iter()
     258            0 :                 .filter_map(move |role| {
     259            0 :                     let roles = &mut ctx.roles;
     260            0 :                     let db_role = roles.get(&role.name);
     261            0 : 
     262            0 :                     match db_role {
     263            0 :                         Some(db_role) => {
     264            0 :                             if db_role.encrypted_password != role.encrypted_password {
     265              :                                 // This can be run on /every/ role! Not just ones created through the console.
     266              :                                 // This means that if you add some funny ALTER here that adds a permission,
     267              :                                 // this will get run even on user-created roles! This will result in different
     268              :                                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     269              :                                 // now only grants LOGIN and changes the password. Please do not allow this branch
     270              :                                 // to do anything silly.
     271            0 :                                 Some(Operation {
     272            0 :                                     query: format!(
     273            0 :                                         "ALTER ROLE {} {}",
     274            0 :                                         role.name.pg_quote(),
     275            0 :                                         role.to_pg_options(),
     276            0 :                                     ),
     277            0 :                                     comment: None,
     278            0 :                                 })
     279              :                             } else {
     280            0 :                                 None
     281              :                             }
     282              :                         }
     283              :                         None => {
     284            0 :                             let query = if !jwks_roles.contains(role.name.as_str()) {
     285            0 :                                 format!(
     286            0 :                                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
     287            0 :                                     role.name.pg_quote(),
     288            0 :                                     role.to_pg_options(),
     289            0 :                                 )
     290              :                             } else {
     291            0 :                                 format!(
     292            0 :                                     "CREATE ROLE {} {}",
     293            0 :                                     role.name.pg_quote(),
     294            0 :                                     role.to_pg_options(),
     295            0 :                                 )
     296              :                             };
     297            0 :                             Some(Operation {
     298            0 :                                 query,
     299            0 :                                 comment: Some(format!("creating role {}", role.name)),
     300            0 :                             })
     301              :                         }
     302              :                     }
     303            0 :                 });
     304            0 : 
     305            0 :             Ok(Box::new(operations))
     306              :         }
     307              :         ApplySpecPhase::RenameAndDeleteDatabases => {
     308            0 :             let mut ctx = ctx.write().await;
     309              : 
     310            0 :             let operations = spec
     311            0 :                 .delta_operations
     312            0 :                 .iter()
     313            0 :                 .flatten()
     314            0 :                 .filter_map(move |op| {
     315            0 :                     let databases = &mut ctx.dbs;
     316            0 :                     match op.action.as_str() {
     317            0 :                         // We do not check whether the DB exists or not,
     318            0 :                         // Postgres will take care of it for us
     319            0 :                         "delete_db" => {
     320              :                             // In Postgres we can't drop a database if it is a template.
     321              :                             // So we need to unset the template flag first, but it could
     322              :                             // be a retry, so we could've already dropped the database.
     323              :                             // Check that database exists first to make it idempotent.
     324            0 :                             let unset_template_query: String = format!(
     325            0 :                                 include_str!("sql/unset_template_for_drop_dbs.sql"),
     326            0 :                                 datname_str = escape_literal(&op.name),
     327            0 :                                 datname = &op.name.pg_quote()
     328            0 :                             );
     329            0 : 
     330            0 :                             // Use FORCE to drop database even if there are active connections.
     331            0 :                             // We run this from `cloud_admin`, so it should have enough privileges.
     332            0 :                             //
     333            0 :                             // NB: there could be other db states, which prevent us from dropping
     334            0 :                             // the database. For example, if db is used by any active subscription
     335            0 :                             // or replication slot.
     336            0 :                             // Such cases are handled in the DropLogicalSubscriptions
     337            0 :                             // phase. We do all the cleanup before actually dropping the database.
     338            0 :                             let drop_db_query: String = format!(
     339            0 :                                 "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     340            0 :                                 &op.name.pg_quote()
     341            0 :                             );
     342            0 : 
     343            0 :                             databases.remove(&op.name);
     344            0 : 
     345            0 :                             Some(vec![
     346            0 :                                 Operation {
     347            0 :                                     query: unset_template_query,
     348            0 :                                     comment: Some(format!(
     349            0 :                                         "optionally clearing template flags for DB {}",
     350            0 :                                         op.name,
     351            0 :                                     )),
     352            0 :                                 },
     353            0 :                                 Operation {
     354            0 :                                     query: drop_db_query,
     355            0 :                                     comment: Some(format!("deleting database {}", op.name,)),
     356            0 :                                 },
     357            0 :                             ])
     358              :                         }
     359            0 :                         "rename_db" => {
     360            0 :                             if let Some(mut db) = databases.remove(&op.name) {
     361              :                                 // update state of known databases
     362            0 :                                 let new_name = op.new_name.as_ref().unwrap();
     363            0 :                                 db.name = new_name.clone();
     364            0 :                                 databases.insert(db.name.clone(), db);
     365            0 : 
     366            0 :                                 Some(vec![Operation {
     367            0 :                                     query: format!(
     368            0 :                                         "ALTER DATABASE {} RENAME TO {}",
     369            0 :                                         op.name.pg_quote(),
     370            0 :                                         new_name.pg_quote(),
     371            0 :                                     ),
     372            0 :                                     comment: Some(format!(
     373            0 :                                         "renaming database '{}' to '{}'",
     374            0 :                                         op.name, new_name
     375            0 :                                     )),
     376            0 :                                 }])
     377              :                             } else {
     378            0 :                                 None
     379              :                             }
     380              :                         }
     381            0 :                         _ => None,
     382              :                     }
     383            0 :                 })
     384            0 :                 .flatten();
     385            0 : 
     386            0 :             Ok(Box::new(operations))
     387              :         }
     388              :         ApplySpecPhase::CreateAndAlterDatabases => {
     389            0 :             let mut ctx = ctx.write().await;
     390              : 
     391            0 :             let operations = spec
     392            0 :                 .cluster
     393            0 :                 .databases
     394            0 :                 .iter()
     395            0 :                 .filter_map(move |db| {
     396            0 :                     let databases = &mut ctx.dbs;
     397            0 :                     if let Some(edb) = databases.get_mut(&db.name) {
     398            0 :                         let change_owner = if edb.owner.starts_with('"') {
     399            0 :                             db.owner.pg_quote() != edb.owner
     400              :                         } else {
     401            0 :                             db.owner != edb.owner
     402              :                         };
     403              : 
     404            0 :                         edb.owner = db.owner.clone();
     405            0 : 
     406            0 :                         if change_owner {
     407            0 :                             Some(vec![Operation {
     408            0 :                                 query: format!(
     409            0 :                                     "ALTER DATABASE {} OWNER TO {}",
     410            0 :                                     db.name.pg_quote(),
     411            0 :                                     db.owner.pg_quote()
     412            0 :                                 ),
     413            0 :                                 comment: Some(format!(
     414            0 :                                     "changing database owner of database {} to {}",
     415            0 :                                     db.name, db.owner
     416            0 :                                 )),
     417            0 :                             }])
     418              :                         } else {
     419            0 :                             None
     420              :                         }
     421              :                     } else {
     422            0 :                         databases.insert(db.name.clone(), db.clone());
     423            0 : 
     424            0 :                         Some(vec![
     425            0 :                             Operation {
     426            0 :                                 query: format!(
     427            0 :                                     "CREATE DATABASE {} {}",
     428            0 :                                     db.name.pg_quote(),
     429            0 :                                     db.to_pg_options(),
     430            0 :                                 ),
     431            0 :                                 comment: None,
     432            0 :                             },
     433            0 :                             Operation {
     434            0 :                                 query: format!(
     435            0 :                                     "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
     436            0 :                                     db.name.pg_quote()
     437            0 :                                 ),
     438            0 :                                 comment: None,
     439            0 :                             },
     440            0 :                         ])
     441              :                     }
     442            0 :                 })
     443            0 :                 .flatten();
     444            0 : 
     445            0 :             Ok(Box::new(operations))
     446              :         }
     447            0 :         ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
     448            0 :             query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
     449            0 :             comment: Some(String::from(
     450            0 :                 "create schema for neon extension and utils tables",
     451            0 :             )),
     452            0 :         }))),
     453            0 :         ApplySpecPhase::RunInEachDatabase { db, subphase } => {
     454            0 :             match subphase {
     455              :                 PerDatabasePhase::DropLogicalSubscriptions => {
     456            0 :                     match &db {
     457            0 :                         DB::UserDB(db) => {
     458            0 :                             let drop_subscription_query: String = format!(
     459            0 :                                 include_str!("sql/drop_subscriptions.sql"),
     460            0 :                                 datname_str = escape_literal(&db.name),
     461            0 :                             );
     462            0 : 
     463            0 :                             let operations = vec![Operation {
     464            0 :                                 query: drop_subscription_query,
     465            0 :                                 comment: Some(format!(
     466            0 :                                     "optionally dropping subscriptions for DB {}",
     467            0 :                                     db.name,
     468            0 :                                 )),
     469            0 :                             }]
     470            0 :                             .into_iter();
     471            0 : 
     472            0 :                             Ok(Box::new(operations))
     473              :                         }
     474              :                         // skip this cleanup for the system databases
     475              :                         // because users can't drop them
     476            0 :                         DB::SystemDB => Ok(Box::new(empty())),
     477              :                     }
     478              :                 }
     479              :                 PerDatabasePhase::DeleteDBRoleReferences => {
     480            0 :                     let ctx = ctx.read().await;
     481              : 
     482            0 :                     let operations =
     483            0 :                         spec.delta_operations
     484            0 :                             .iter()
     485            0 :                             .flatten()
     486            0 :                             .filter(|op| op.action == "delete_role")
     487            0 :                             .filter_map(move |op| {
     488            0 :                                 if db.is_owned_by(&op.name) {
     489            0 :                                     return None;
     490            0 :                                 }
     491            0 :                                 if !ctx.roles.contains_key(&op.name) {
     492            0 :                                     return None;
     493            0 :                                 }
     494            0 :                                 let quoted = op.name.pg_quote();
     495            0 :                                 let new_owner = match &db {
     496            0 :                                     DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
     497            0 :                                     DB::UserDB(db) => db.owner.pg_quote(),
     498              :                                 };
     499              : 
     500            0 :                                 Some(vec![
     501            0 :                                     // This will reassign all dependent objects to the db owner
     502            0 :                                     Operation {
     503            0 :                                         query: format!(
     504            0 :                                             "REASSIGN OWNED BY {} TO {}",
     505            0 :                                             quoted, new_owner,
     506            0 :                                         ),
     507            0 :                                         comment: None,
     508            0 :                                     },
     509            0 :                                     // Revoke some potentially blocking privileges (Neon-specific currently)
     510            0 :                                     Operation {
     511            0 :                                         query: format!(
     512            0 :                                             include_str!("sql/pre_drop_role_revoke_privileges.sql"),
     513            0 :                                             role_name = quoted,
     514            0 :                                         ),
     515            0 :                                         comment: None,
     516            0 :                                     },
     517            0 :                                     // This now will only drop privileges of the role
     518            0 :                                     // TODO: this is obviously not 100% true because of the above case,
     519            0 :                                     // there could be still some privileges that are not revoked. Maybe this
     520            0 :                                     // only drops privileges that were granted *by this* role, not *to this* role,
     521            0 :                                     // but this has to be checked.
     522            0 :                                     Operation {
     523            0 :                                         query: format!("DROP OWNED BY {}", quoted),
     524            0 :                                         comment: None,
     525            0 :                                     },
     526            0 :                                 ])
     527            0 :                             })
     528            0 :                             .flatten();
     529            0 : 
     530            0 :                     Ok(Box::new(operations))
     531              :                 }
     532              :                 PerDatabasePhase::ChangeSchemaPerms => {
     533            0 :                     let ctx = ctx.read().await;
     534            0 :                     let databases = &ctx.dbs;
     535              : 
     536            0 :                     let db = match &db {
     537              :                         // ignore schema permissions on the system database
     538            0 :                         DB::SystemDB => return Ok(Box::new(empty())),
     539            0 :                         DB::UserDB(db) => db,
     540            0 :                     };
     541            0 : 
     542            0 :                     if databases.get(&db.name).is_none() {
     543            0 :                         bail!("database {} doesn't exist in PostgreSQL", db.name);
     544            0 :                     }
     545            0 : 
     546            0 :                     let edb = databases.get(&db.name).unwrap();
     547            0 : 
     548            0 :                     if edb.restrict_conn || edb.invalid {
     549            0 :                         return Ok(Box::new(empty()));
     550            0 :                     }
     551            0 : 
     552            0 :                     let operations = vec![
     553            0 :                         Operation {
     554            0 :                             query: format!(
     555            0 :                                 include_str!("sql/set_public_schema_owner.sql"),
     556            0 :                                 db_owner = db.owner.pg_quote()
     557            0 :                             ),
     558            0 :                             comment: None,
     559            0 :                         },
     560            0 :                         Operation {
     561            0 :                             query: String::from(include_str!("sql/default_grants.sql")),
     562            0 :                             comment: None,
     563            0 :                         },
     564            0 :                     ]
     565            0 :                     .into_iter();
     566            0 : 
     567            0 :                     Ok(Box::new(operations))
     568              :                 }
     569              :                 PerDatabasePhase::HandleAnonExtension => {
     570              :                     // Only install Anon into user databases
     571            0 :                     let db = match &db {
     572            0 :                         DB::SystemDB => return Ok(Box::new(empty())),
     573            0 :                         DB::UserDB(db) => db,
     574            0 :                     };
     575            0 :                     // Never install Anon when it's not enabled as feature
     576            0 :                     if !spec.features.contains(&ComputeFeature::AnonExtension) {
     577            0 :                         return Ok(Box::new(empty()));
     578            0 :                     }
     579            0 : 
     580            0 :                     // Only install Anon when it's added in preload libraries
     581            0 :                     let opt_libs = spec.cluster.settings.find("shared_preload_libraries");
     582              : 
     583            0 :                     let libs = match opt_libs {
     584            0 :                         Some(libs) => libs,
     585            0 :                         None => return Ok(Box::new(empty())),
     586              :                     };
     587              : 
     588            0 :                     if !libs.contains("anon") {
     589            0 :                         return Ok(Box::new(empty()));
     590            0 :                     }
     591            0 : 
     592            0 :                     let db_owner = db.owner.pg_quote();
     593            0 : 
     594            0 :                     let operations = vec![
     595            0 :                         // Create anon extension if this compute needs it
     596            0 :                         // Users cannot create it themselves, because superuser is required.
     597            0 :                         Operation {
     598            0 :                             query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"),
     599            0 :                             comment: Some(String::from("creating anon extension")),
     600            0 :                         },
     601            0 :                         // Initialize anon extension
     602            0 :                         // This also requires superuser privileges, so users cannot do it themselves.
     603            0 :                         Operation {
     604            0 :                             query: String::from("SELECT anon.init()"),
     605            0 :                             comment: Some(String::from("initializing anon extension data")),
     606            0 :                         },
     607            0 :                         Operation {
     608            0 :                             query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner),
     609            0 :                             comment: Some(String::from(
     610            0 :                                 "granting anon extension schema permissions",
     611            0 :                             )),
     612            0 :                         },
     613            0 :                         Operation {
     614            0 :                             query: format!(
     615            0 :                                 "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}",
     616            0 :                                 db_owner
     617            0 :                             ),
     618            0 :                             comment: Some(String::from(
     619            0 :                                 "granting anon extension schema functions permissions",
     620            0 :                             )),
     621            0 :                         },
     622            0 :                         // We need this, because some functions are defined as SECURITY DEFINER.
     623            0 :                         // In Postgres SECURITY DEFINER functions are executed with the privileges
     624            0 :                         // of the owner.
     625            0 :                         // In anon extension this it is needed to access some GUCs, which are only accessible to
     626            0 :                         // superuser. But we've patched postgres to allow db_owner to access them as well.
     627            0 :                         // So we need to change owner of these functions to db_owner.
     628            0 :                         Operation {
     629            0 :                             query: format!(
     630            0 :                                 include_str!("sql/anon_ext_fn_reassign.sql"),
     631            0 :                                 db_owner = db_owner,
     632            0 :                             ),
     633            0 :                             comment: Some(String::from(
     634            0 :                                 "change anon extension functions owner to database_owner",
     635            0 :                             )),
     636            0 :                         },
     637            0 :                         Operation {
     638            0 :                             query: format!(
     639            0 :                                 "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}",
     640            0 :                                 db_owner,
     641            0 :                             ),
     642            0 :                             comment: Some(String::from(
     643            0 :                                 "granting anon extension tables permissions",
     644            0 :                             )),
     645            0 :                         },
     646            0 :                         Operation {
     647            0 :                             query: format!(
     648            0 :                                 "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}",
     649            0 :                                 db_owner,
     650            0 :                             ),
     651            0 :                             comment: Some(String::from(
     652            0 :                                 "granting anon extension sequences permissions",
     653            0 :                             )),
     654            0 :                         },
     655            0 :                     ]
     656            0 :                     .into_iter();
     657            0 : 
     658            0 :                     Ok(Box::new(operations))
     659              :                 }
     660              :             }
     661              :         }
     662              :         // Interestingly, we only install p_s_s in the main database, even when
     663              :         // it's preloaded.
     664              :         ApplySpecPhase::HandleOtherExtensions => {
     665            0 :             if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     666            0 :                 if libs.contains("pg_stat_statements") {
     667            0 :                     return Ok(Box::new(once(Operation {
     668            0 :                         query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
     669            0 :                         comment: Some(String::from("create system extensions")),
     670            0 :                     })));
     671            0 :                 }
     672            0 :             }
     673            0 :             Ok(Box::new(empty()))
     674              :         }
     675              :         ApplySpecPhase::HandleNeonExtension => {
     676            0 :             let operations = vec![
     677            0 :                 Operation {
     678            0 :                     query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
     679            0 :                     comment: Some(String::from(
     680            0 :                         "init: install the extension if not already installed",
     681            0 :                     )),
     682            0 :                 },
     683            0 :                 Operation {
     684            0 :                     query: String::from(
     685            0 :                         "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
     686            0 :                     ),
     687            0 :                     comment: Some(String::from("compat/fix: make neon relocatable")),
     688            0 :                 },
     689            0 :                 Operation {
     690            0 :                     query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
     691            0 :                     comment: Some(String::from("compat/fix: alter neon extension schema")),
     692            0 :                 },
     693            0 :                 Operation {
     694            0 :                     query: String::from("ALTER EXTENSION neon UPDATE"),
     695            0 :                     comment: Some(String::from("compat/update: update neon extension version")),
     696            0 :                 },
     697            0 :             ]
     698            0 :             .into_iter();
     699            0 : 
     700            0 :             Ok(Box::new(operations))
     701              :         }
     702            0 :         ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
     703            0 :             query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
     704            0 :             comment: None,
     705            0 :         }))),
     706              :         ApplySpecPhase::DropRoles => {
     707            0 :             let operations = spec
     708            0 :                 .delta_operations
     709            0 :                 .iter()
     710            0 :                 .flatten()
     711            0 :                 .filter(|op| op.action == "delete_role")
     712            0 :                 .map(|op| Operation {
     713            0 :                     query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
     714            0 :                     comment: None,
     715            0 :                 });
     716            0 : 
     717            0 :             Ok(Box::new(operations))
     718              :         }
     719            0 :         ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
     720            0 :             query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
     721            0 :             comment: None,
     722            0 :         }))),
     723              :     }
     724            0 : }
        

Generated by: LCOV version 2.1-beta