LCOV - code coverage report
Current view: top level - compute_tools/src - spec_apply.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 0.0 % 802 0
Test Date: 2025-07-31 15:59:03 Functions: 0.0 % 53 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::fmt::{Debug, Formatter};
       3              : use std::future::Future;
       4              : use std::iter::{empty, once};
       5              : use std::sync::Arc;
       6              : 
       7              : use anyhow::{Context, Result};
       8              : use compute_api::responses::ComputeStatus;
       9              : use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role};
      10              : use futures::future::join_all;
      11              : use tokio::sync::RwLock;
      12              : use tokio_postgres::Client;
      13              : use tokio_postgres::error::SqlState;
      14              : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
      15              : 
      16              : use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles};
      17              : use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS;
      18              : use crate::pg_helpers::{
      19              :     DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
      20              :     get_existing_roles_async,
      21              : };
      22              : use crate::spec_apply::ApplySpecPhase::{
      23              :     AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles,
      24              :     CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension,
      25              :     CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
      26              :     DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
      27              :     HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions,
      28              :     RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
      29              : };
      30              : use crate::spec_apply::PerDatabasePhase::{
      31              :     ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions,
      32              : };
      33              : 
      34              : impl ComputeNode {
      35              :     /// Apply the spec to the running PostgreSQL instance.
      36              :     /// The caller can decide to run with multiple clients in parallel, or
      37              :     /// single mode.  Either way, the commands executed will be the same, and
      38              :     /// only commands run in different databases are parallelized.
      39              :     #[instrument(skip_all)]
      40              :     pub fn apply_spec_sql(
      41              :         &self,
      42              :         spec: Arc<ComputeSpec>,
      43              :         conf: Arc<tokio_postgres::Config>,
      44              :         concurrency: usize,
      45              :     ) -> Result<()> {
      46              :         info!("Applying config with max {} concurrency", concurrency);
      47              :         debug!("Config: {:?}", spec);
      48              : 
      49              :         let rt = tokio::runtime::Handle::current();
      50            0 :         rt.block_on(async {
      51              :             // Proceed with post-startup configuration. Note, that order of operations is important.
      52            0 :             let client = Self::get_maintenance_client(&conf).await?;
      53            0 :             let spec = spec.clone();
      54            0 :             let params = Arc::new(self.params.clone());
      55              : 
      56            0 :             let databases = get_existing_dbs_async(&client).await?;
      57            0 :             let roles = get_existing_roles_async(&client)
      58            0 :                 .await?
      59            0 :                 .into_iter()
      60            0 :                 .map(|role| (role.name.clone(), role))
      61            0 :                 .collect::<HashMap<String, Role>>();
      62              : 
      63              :             // Check if we need to drop subscriptions before starting the endpoint.
      64              :             //
      65              :             // It is important to do this operation exactly once when endpoint starts on a new branch.
      66              :             // Otherwise, we may drop not inherited, but newly created subscriptions.
      67              :             //
      68              :             // We cannot rely only on spec.drop_subscriptions_before_start flag,
      69              :             // because if for some reason compute restarts inside VM,
      70              :             // it will start again with the same spec and flag value.
      71              :             //
      72              :             // To handle this, we save the fact of the operation in the database
      73              :             // in the neon.drop_subscriptions_done table.
      74              :             // If the table does not exist, we assume that the operation was never performed, so we must do it.
      75              :             // If table exists, we check if the operation was performed on the current timelilne.
      76              :             //
      77            0 :             let mut drop_subscriptions_done = false;
      78              : 
      79            0 :             if spec.drop_subscriptions_before_start {
      80            0 :                 let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
      81              : 
      82            0 :                 info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
      83              : 
      84              :                 drop_subscriptions_done = match
      85            0 :                     client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
      86            0 :                     Ok(result) => !result.is_empty(),
      87            0 :                     Err(e) =>
      88              :                     {
      89            0 :                         match e.code() {
      90            0 :                             Some(&SqlState::UNDEFINED_TABLE) => false,
      91              :                             _ => {
      92              :                                 // We don't expect any other error here, except for the schema/table not existing
      93            0 :                                 error!("Error checking if drop subscription operation was already performed: {}", e);
      94            0 :                                 return Err(e.into());
      95              :                             }
      96              :                         }
      97              :                     }
      98              :                 }
      99            0 :             };
     100              : 
     101              : 
     102            0 :             let jwks_roles = Arc::new(
     103            0 :                 spec.as_ref()
     104            0 :                     .local_proxy_config
     105            0 :                     .iter()
     106            0 :                     .flat_map(|it| &it.jwks)
     107            0 :                     .flatten()
     108            0 :                     .flat_map(|setting| &setting.role_names)
     109            0 :                     .cloned()
     110            0 :                     .collect::<HashSet<_>>(),
     111              :             );
     112              : 
     113            0 :             let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
     114            0 :                 roles,
     115            0 :                 dbs: databases,
     116            0 :             }));
     117              : 
     118              :             // Apply special pre drop database phase.
     119              :             // NOTE: we use the code of RunInEachDatabase phase for parallelism
     120              :             // and connection management, but we don't really run it in *each* database,
     121              :             // only in databases, we're about to drop.
     122            0 :             info!("Applying PerDatabase (pre-dropdb) phase");
     123            0 :             let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
     124              : 
     125              :             // Run the phase for each database that we're about to drop.
     126            0 :             let db_processes = spec
     127            0 :                 .delta_operations
     128            0 :                 .iter()
     129            0 :                 .flatten()
     130            0 :                 .filter_map(move |op| {
     131            0 :                     if op.action.as_str() == "delete_db" {
     132            0 :                         Some(op.name.clone())
     133              :                     } else {
     134            0 :                         None
     135              :                     }
     136            0 :                 })
     137            0 :                 .map(|dbname| {
     138            0 :                     let spec = spec.clone();
     139            0 :                     let ctx = ctx.clone();
     140            0 :                     let jwks_roles = jwks_roles.clone();
     141            0 :                     let mut conf = conf.as_ref().clone();
     142            0 :                     let concurrency_token = concurrency_token.clone();
     143              :                     // We only need dbname field for this phase, so set other fields to dummy values
     144            0 :                     let db = DB::UserDB(Database {
     145            0 :                         name: dbname.clone(),
     146            0 :                         owner: "cloud_admin".to_string(),
     147            0 :                         options: None,
     148            0 :                         restrict_conn: false,
     149            0 :                         invalid: false,
     150            0 :                     });
     151              : 
     152            0 :                     debug!("Applying per-database phases for Database {:?}", &db);
     153              : 
     154            0 :                     match &db {
     155            0 :                         DB::SystemDB => {}
     156            0 :                         DB::UserDB(db) => {
     157            0 :                             conf.dbname(db.name.as_str());
     158            0 :                         }
     159              :                     }
     160              : 
     161            0 :                     let conf = Arc::new(conf);
     162            0 :                     let fut = Self::apply_spec_sql_db(
     163            0 :                         params.clone(),
     164            0 :                         spec.clone(),
     165            0 :                         conf,
     166            0 :                         ctx.clone(),
     167            0 :                         jwks_roles.clone(),
     168            0 :                         concurrency_token.clone(),
     169            0 :                         db,
     170            0 :                         [DropLogicalSubscriptions].to_vec(),
     171            0 :                         self.params.lakebase_mode,
     172              :                     );
     173              : 
     174            0 :                     Ok(tokio::spawn(fut))
     175            0 :                 })
     176            0 :                 .collect::<Vec<Result<_, anyhow::Error>>>();
     177              : 
     178            0 :             for process in db_processes.into_iter() {
     179            0 :                 let handle = process?;
     180            0 :                 if let Err(e) = handle.await? {
     181              :                     // Handle the error case where the database does not exist
     182              :                     // We do not check whether the DB exists or not in the deletion phase,
     183              :                     // so we shouldn't be strict about it in pre-deletion cleanup as well.
     184            0 :                     if e.to_string().contains("does not exist") {
     185            0 :                         warn!("Error dropping subscription: {}", e);
     186              :                     } else {
     187            0 :                         return Err(e);
     188              :                     }
     189            0 :                 };
     190              :             }
     191              : 
     192            0 :             let phases = if self.params.lakebase_mode {
     193            0 :                 vec![
     194            0 :                     CreatePrivilegedRole,
     195              :                 // BEGIN_HADRON
     196            0 :                 CreateDatabricksRoles,
     197            0 :                 AlterDatabricksRoles,
     198              :                 // END_HADRON
     199            0 :                 DropInvalidDatabases,
     200            0 :                 RenameRoles,
     201            0 :                 CreateAndAlterRoles,
     202            0 :                 RenameAndDeleteDatabases,
     203            0 :                 CreateAndAlterDatabases,
     204            0 :                 CreateSchemaNeon,
     205              :             ]
     206              :             } else {
     207            0 :                 vec![
     208            0 :                     CreatePrivilegedRole,
     209            0 :                 DropInvalidDatabases,
     210            0 :                 RenameRoles,
     211            0 :                 CreateAndAlterRoles,
     212            0 :                 RenameAndDeleteDatabases,
     213            0 :                 CreateAndAlterDatabases,
     214            0 :                 CreateSchemaNeon,
     215              :             ]
     216              :             };
     217              : 
     218            0 :             for phase in phases {
     219            0 :                 info!("Applying phase {:?}", &phase);
     220            0 :                 apply_operations(
     221            0 :                     params.clone(),
     222            0 :                     spec.clone(),
     223            0 :                     ctx.clone(),
     224            0 :                     jwks_roles.clone(),
     225            0 :                     phase,
     226            0 :                     || async { Ok(&client) },
     227            0 :                     self.params.lakebase_mode,
     228              :                 )
     229            0 :                 .await?;
     230              :             }
     231              : 
     232            0 :             info!("Applying RunInEachDatabase2 phase");
     233            0 :             let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
     234              : 
     235            0 :             let db_processes = spec
     236            0 :                 .cluster
     237            0 :                 .databases
     238            0 :                 .iter()
     239            0 :                 .map(|db| DB::new(db.clone()))
     240              :                 // include
     241            0 :                 .chain(once(DB::SystemDB))
     242            0 :                 .map(|db| {
     243            0 :                     let spec = spec.clone();
     244            0 :                     let ctx = ctx.clone();
     245            0 :                     let jwks_roles = jwks_roles.clone();
     246            0 :                     let mut conf = conf.as_ref().clone();
     247            0 :                     let concurrency_token = concurrency_token.clone();
     248            0 :                     let db = db.clone();
     249              : 
     250            0 :                     debug!("Applying per-database phases for Database {:?}", &db);
     251              : 
     252            0 :                     match &db {
     253            0 :                         DB::SystemDB => {}
     254            0 :                         DB::UserDB(db) => {
     255            0 :                             conf.dbname(db.name.as_str());
     256            0 :                         }
     257              :                     }
     258              : 
     259            0 :                     let conf = Arc::new(conf);
     260            0 :                     let mut phases = vec![
     261            0 :                         DeleteDBRoleReferences,
     262            0 :                         ChangeSchemaPerms,
     263              :                     ];
     264              : 
     265            0 :                     if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
     266            0 :                         info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
     267            0 :                         phases.push(DropLogicalSubscriptions);
     268            0 :                     }
     269              : 
     270            0 :                     let fut = Self::apply_spec_sql_db(
     271            0 :                         params.clone(),
     272            0 :                         spec.clone(),
     273            0 :                         conf,
     274            0 :                         ctx.clone(),
     275            0 :                         jwks_roles.clone(),
     276            0 :                         concurrency_token.clone(),
     277            0 :                         db,
     278            0 :                         phases,
     279            0 :                         self.params.lakebase_mode,
     280              :                     );
     281              : 
     282            0 :                     Ok(tokio::spawn(fut))
     283            0 :                 })
     284            0 :                 .collect::<Vec<Result<_, anyhow::Error>>>();
     285              : 
     286            0 :             for process in db_processes.into_iter() {
     287            0 :                 let handle = process?;
     288            0 :                 handle.await??;
     289              :             }
     290              : 
     291            0 :             let mut phases = if self.params.lakebase_mode {
     292            0 :                 vec![
     293            0 :                 HandleOtherExtensions,
     294            0 :                 HandleNeonExtension, // This step depends on CreateSchemaNeon
     295              :                 // BEGIN_HADRON
     296            0 :                 HandleDatabricksAuthExtension,
     297              :                 // END_HADRON
     298            0 :                 CreateAvailabilityCheck,
     299            0 :                 DropRoles,
     300              :                 // BEGIN_HADRON
     301            0 :                 AddDatabricksGrants,
     302            0 :                 CreateDatabricksMisc,
     303              :                 // END_HADRON
     304              :             ]
     305              :             } else {
     306            0 :                 vec![
     307            0 :                 HandleOtherExtensions,
     308            0 :                 HandleNeonExtension, // This step depends on CreateSchemaNeon
     309            0 :                 CreateAvailabilityCheck,
     310            0 :                 DropRoles,
     311              :             ]
     312              :             };
     313              : 
     314              :             // This step depends on CreateSchemaNeon
     315            0 :             if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
     316            0 :                 info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
     317            0 :                 phases.push(FinalizeDropLogicalSubscriptions);
     318            0 :             }
     319              : 
     320              :             // Keep DisablePostgresDBPgAudit phase at the end,
     321              :             // so that all config operations are audit logged.
     322            0 :             match spec.audit_log_level
     323              :             {
     324            0 :                 ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
     325            0 :                     phases.push(CreatePgauditExtension);
     326            0 :                     phases.push(CreatePgauditlogtofileExtension);
     327            0 :                     phases.push(DisablePostgresDBPgAudit);
     328            0 :                 }
     329            0 :                 ComputeAudit::Log | ComputeAudit::Base => {
     330            0 :                     phases.push(CreatePgauditExtension);
     331            0 :                     phases.push(DisablePostgresDBPgAudit);
     332            0 :                 }
     333            0 :                 ComputeAudit::Disabled => {}
     334              :             }
     335              : 
     336            0 :             for phase in phases {
     337            0 :                 debug!("Applying phase {:?}", &phase);
     338            0 :                 apply_operations(
     339            0 :                     params.clone(),
     340            0 :                     spec.clone(),
     341            0 :                     ctx.clone(),
     342            0 :                     jwks_roles.clone(),
     343            0 :                     phase,
     344            0 :                     || async { Ok(&client) },
     345            0 :                     self.params.lakebase_mode,
     346              :                 )
     347            0 :                 .await?;
     348              :             }
     349              : 
     350            0 :             Ok::<(), anyhow::Error>(())
     351            0 :         })?;
     352              : 
     353              :         Ok(())
     354              :     }
     355              : 
     356              :     /// Apply SQL migrations of the RunInEachDatabase phase.
     357              :     ///
     358              :     /// May opt to not connect to databases that don't have any scheduled
     359              :     /// operations.  The function is concurrency-controlled with the provided
     360              :     /// semaphore.  The caller has to make sure the semaphore isn't exhausted.
     361              :     #[allow(clippy::too_many_arguments)] // TODO: needs bigger refactoring
     362            0 :     async fn apply_spec_sql_db(
     363            0 :         params: Arc<ComputeNodeParams>,
     364            0 :         spec: Arc<ComputeSpec>,
     365            0 :         conf: Arc<tokio_postgres::Config>,
     366            0 :         ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
     367            0 :         jwks_roles: Arc<HashSet<String>>,
     368            0 :         concurrency_token: Arc<tokio::sync::Semaphore>,
     369            0 :         db: DB,
     370            0 :         subphases: Vec<PerDatabasePhase>,
     371            0 :         lakebase_mode: bool,
     372            0 :     ) -> Result<()> {
     373            0 :         let _permit = concurrency_token.acquire().await?;
     374              : 
     375            0 :         let mut client_conn = None;
     376              : 
     377            0 :         for subphase in subphases {
     378            0 :             apply_operations(
     379            0 :                 params.clone(),
     380            0 :                 spec.clone(),
     381            0 :                 ctx.clone(),
     382            0 :                 jwks_roles.clone(),
     383            0 :                 RunInEachDatabase {
     384            0 :                     db: db.clone(),
     385            0 :                     subphase,
     386            0 :                 },
     387              :                 // Only connect if apply_operation actually wants a connection.
     388              :                 // It's quite possible this database doesn't need any queries,
     389              :                 // so by not connecting we save time and effort connecting to
     390              :                 // that database.
     391            0 :                 || async {
     392            0 :                     if client_conn.is_none() {
     393            0 :                         let db_client = Self::get_maintenance_client(&conf).await?;
     394            0 :                         client_conn.replace(db_client);
     395            0 :                     }
     396            0 :                     let client = client_conn.as_ref().unwrap();
     397            0 :                     Ok(client)
     398            0 :                 },
     399            0 :                 lakebase_mode,
     400              :             )
     401            0 :             .await?;
     402              :         }
     403              : 
     404            0 :         drop(client_conn);
     405              : 
     406            0 :         Ok::<(), anyhow::Error>(())
     407            0 :     }
     408              : 
     409              :     /// Choose how many concurrent connections to use for applying the spec changes.
     410            0 :     pub fn max_service_connections(
     411            0 :         &self,
     412            0 :         compute_state: &ComputeState,
     413            0 :         spec: &ComputeSpec,
     414            0 :     ) -> usize {
     415              :         // If the cluster is in Init state we don't have to deal with user connections,
     416              :         // and can thus use all `max_connections` connection slots. However, that's generally not
     417              :         // very efficient, so we generally still limit it to a smaller number.
     418            0 :         if compute_state.status == ComputeStatus::Init {
     419              :             // If the settings contain 'max_connections', use that as template
     420            0 :             if let Some(config) = spec.cluster.settings.find("max_connections") {
     421            0 :                 config.parse::<usize>().ok()
     422              :             } else {
     423              :                 // Otherwise, try to find the setting in the postgresql_conf string
     424            0 :                 spec.cluster
     425            0 :                     .postgresql_conf
     426            0 :                     .iter()
     427            0 :                     .flat_map(|conf| conf.split("\n"))
     428            0 :                     .filter_map(|line| {
     429            0 :                         if !line.contains("max_connections") {
     430            0 :                             return None;
     431            0 :                         }
     432              : 
     433            0 :                         let (key, value) = line.split_once("=")?;
     434            0 :                         let key = key
     435            0 :                             .trim_start_matches(char::is_whitespace)
     436            0 :                             .trim_end_matches(char::is_whitespace);
     437              : 
     438            0 :                         let value = value
     439            0 :                             .trim_start_matches(char::is_whitespace)
     440            0 :                             .trim_end_matches(char::is_whitespace);
     441              : 
     442            0 :                         if key != "max_connections" {
     443            0 :                             return None;
     444            0 :                         }
     445              : 
     446            0 :                         value.parse::<usize>().ok()
     447            0 :                     })
     448            0 :                     .next()
     449              :             }
     450              :             // If max_connections is present, use at most 1/3rd of that.
     451              :             // When max_connections is lower than 30, try to use at least 10 connections, but
     452              :             // never more than max_connections.
     453            0 :             .map(|limit| match limit {
     454            0 :                 0..10 => limit,
     455            0 :                 10..30 => 10,
     456            0 :                 30..300 => limit / 3,
     457            0 :                 300.. => 100,
     458            0 :             })
     459              :             // If we didn't find max_connections, default to 10 concurrent connections.
     460            0 :             .unwrap_or(10)
     461              :         } else {
     462              :             // state == Running
     463              :             // Because the cluster is already in the Running state, we should assume users are
     464              :             // already connected to the cluster, and high concurrency could negatively
     465              :             // impact user connectivity. Therefore, we can limit concurrency to the number of
     466              :             // reserved superuser connections, which users wouldn't be able to use anyway.
     467            0 :             spec.cluster
     468            0 :                 .settings
     469            0 :                 .find("superuser_reserved_connections")
     470            0 :                 .iter()
     471            0 :                 .filter_map(|val| val.parse::<usize>().ok())
     472            0 :                 .map(|val| if val > 1 { val - 1 } else { 1 })
     473            0 :                 .next_back()
     474            0 :                 .unwrap_or(3)
     475              :         }
     476            0 :     }
     477              : }
     478              : 
     479              : #[derive(Clone)]
     480              : pub enum DB {
     481              :     SystemDB,
     482              :     UserDB(Database),
     483              : }
     484              : 
     485              : impl DB {
     486            0 :     pub fn new(db: Database) -> DB {
     487            0 :         Self::UserDB(db)
     488            0 :     }
     489              : 
     490            0 :     pub fn is_owned_by(&self, role: &PgIdent) -> bool {
     491            0 :         match self {
     492            0 :             DB::SystemDB => false,
     493            0 :             DB::UserDB(db) => &db.owner == role,
     494              :         }
     495            0 :     }
     496              : }
     497              : 
     498              : impl Debug for DB {
     499            0 :     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
     500            0 :         match self {
     501            0 :             DB::SystemDB => f.debug_tuple("SystemDB").finish(),
     502            0 :             DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
     503              :         }
     504            0 :     }
     505              : }
     506              : 
     507              : #[derive(Copy, Clone, Debug)]
     508              : pub enum PerDatabasePhase {
     509              :     DeleteDBRoleReferences,
     510              :     ChangeSchemaPerms,
     511              :     /// This is a shared phase, used for both i) dropping dangling LR subscriptions
     512              :     /// before dropping the DB, and ii) dropping all subscriptions after creating
     513              :     /// a fresh branch.
     514              :     /// N.B. we will skip all DBs that are not present in Postgres, invalid, or
     515              :     /// have `datallowconn = false` (`restrict_conn`).
     516              :     DropLogicalSubscriptions,
     517              : }
     518              : 
     519              : #[derive(Clone, Debug)]
     520              : pub enum ApplySpecPhase {
     521              :     CreatePrivilegedRole,
     522              :     // BEGIN_HADRON
     523              :     CreateDatabricksRoles,
     524              :     AlterDatabricksRoles,
     525              :     // END_HADRON
     526              :     DropInvalidDatabases,
     527              :     RenameRoles,
     528              :     CreateAndAlterRoles,
     529              :     RenameAndDeleteDatabases,
     530              :     CreateAndAlterDatabases,
     531              :     CreateSchemaNeon,
     532              :     RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
     533              :     CreatePgauditExtension,
     534              :     CreatePgauditlogtofileExtension,
     535              :     DisablePostgresDBPgAudit,
     536              :     HandleOtherExtensions,
     537              :     HandleNeonExtension,
     538              :     // BEGIN_HADRON
     539              :     HandleDatabricksAuthExtension,
     540              :     // END_HADRON
     541              :     CreateAvailabilityCheck,
     542              :     // BEGIN_HADRON
     543              :     AddDatabricksGrants,
     544              :     CreateDatabricksMisc,
     545              :     // END_HADRON
     546              :     DropRoles,
     547              :     FinalizeDropLogicalSubscriptions,
     548              : }
     549              : 
     550              : pub struct Operation {
     551              :     pub query: String,
     552              :     pub comment: Option<String>,
     553              : }
     554              : 
     555              : pub struct MutableApplyContext {
     556              :     pub roles: HashMap<String, Role>,
     557              :     pub dbs: HashMap<String, Database>,
     558              : }
     559              : 
     560              : /// Apply the operations that belong to the given spec apply phase.
     561              : ///
     562              : /// Commands within a single phase are executed in order of Iterator yield.
     563              : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
     564              : /// indicated by its `db` field, and can share a single client for all changes
     565              : /// to that database.
     566              : ///
     567              : /// Notes:
     568              : /// - Commands are pipelined, and thus may cause incomplete apply if one
     569              : ///   command of many fails.
     570              : /// - Failing commands will fail the phase's apply step once the return value
     571              : ///   is processed.
     572              : /// - No timeouts have (yet) been implemented.
     573              : /// - The caller is responsible for limiting and/or applying concurrency.
     574            0 : pub async fn apply_operations<'a, Fut, F>(
     575            0 :     params: Arc<ComputeNodeParams>,
     576            0 :     spec: Arc<ComputeSpec>,
     577            0 :     ctx: Arc<RwLock<MutableApplyContext>>,
     578            0 :     jwks_roles: Arc<HashSet<String>>,
     579            0 :     apply_spec_phase: ApplySpecPhase,
     580            0 :     client: F,
     581            0 :     lakebase_mode: bool,
     582            0 : ) -> Result<()>
     583            0 : where
     584            0 :     F: FnOnce() -> Fut,
     585            0 :     Fut: Future<Output = Result<&'a Client>>,
     586            0 : {
     587            0 :     debug!("Starting phase {:?}", &apply_spec_phase);
     588            0 :     let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
     589            0 :     let span2 = span.clone();
     590            0 :     async move {
     591            0 :         debug!("Processing phase {:?}", &apply_spec_phase);
     592            0 :         let ctx = ctx;
     593              : 
     594            0 :         let mut ops = get_operations(&params, &spec, &ctx, &jwks_roles, &apply_spec_phase)
     595            0 :             .await?
     596            0 :             .peekable();
     597              : 
     598              :         // Return (and by doing so, skip requesting the PostgreSQL client) if
     599              :         // we don't have any operations scheduled.
     600            0 :         if ops.peek().is_none() {
     601            0 :             return Ok(());
     602            0 :         }
     603              : 
     604            0 :         let client = client().await?;
     605              : 
     606            0 :         debug!("Applying phase {:?}", &apply_spec_phase);
     607              : 
     608            0 :         let active_queries = ops
     609            0 :             .map(|op| {
     610            0 :                 let Operation { comment, query } = op;
     611            0 :                 let inspan = match comment {
     612            0 :                     None => span.clone(),
     613            0 :                     Some(comment) => info_span!("phase {}: {}", comment),
     614              :                 };
     615              : 
     616            0 :                 async {
     617            0 :                     let query = query;
     618            0 :                     let res = client.simple_query(&query).await;
     619            0 :                     debug!(
     620            0 :                         "{} {}",
     621            0 :                         if res.is_ok() {
     622            0 :                             "successfully executed"
     623              :                         } else {
     624            0 :                             "failed to execute"
     625              :                         },
     626              :                         query
     627              :                     );
     628            0 :                     if !lakebase_mode {
     629            0 :                         return res;
     630            0 :                     }
     631              :                     // BEGIN HADRON
     632            0 :                     if let Err(e) = res.as_ref() {
     633            0 :                         if let Some(sql_state) = e.code() {
     634            0 :                             if sql_state.code() == "57014" {
     635            0 :                                 // SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts.
     636            0 :                                 // Increment the counter whenever a statement timeout occurs. Timeouts on
     637            0 :                                 // this configuration path can only occur due to PS connectivity problems that
     638            0 :                                 // Postgres failed to recover from.
     639            0 :                                 COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc();
     640            0 :                             }
     641            0 :                         }
     642            0 :                     }
     643              :                     // END HADRON
     644              : 
     645            0 :                     res
     646            0 :                 }
     647            0 :                 .instrument(inspan)
     648            0 :             })
     649            0 :             .collect::<Vec<_>>();
     650              : 
     651            0 :         drop(ctx);
     652              : 
     653            0 :         for it in join_all(active_queries).await {
     654            0 :             drop(it?);
     655              :         }
     656              : 
     657            0 :         debug!("Completed phase {:?}", &apply_spec_phase);
     658              : 
     659            0 :         Ok(())
     660            0 :     }
     661            0 :     .instrument(span2)
     662            0 :     .await
     663            0 : }
     664              : 
     665              : /// Create a stream of operations to be executed for that phase of applying
     666              : /// changes.
     667              : ///
     668              : /// In the future we may generate a single stream of changes and then
     669              : /// sort/merge/batch execution, but for now this is a nice way to improve
     670              : /// batching behavior of the commands.
     671            0 : async fn get_operations<'a>(
     672            0 :     params: &'a ComputeNodeParams,
     673            0 :     spec: &'a ComputeSpec,
     674            0 :     ctx: &'a RwLock<MutableApplyContext>,
     675            0 :     jwks_roles: &'a HashSet<String>,
     676            0 :     apply_spec_phase: &'a ApplySpecPhase,
     677            0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
     678            0 :     match apply_spec_phase {
     679            0 :         ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
     680            0 :             query: format!(
     681            0 :                 include_str!("sql/create_privileged_role.sql"),
     682              :                 privileged_role_name = params.privileged_role_name,
     683            0 :                 privileges = if params.lakebase_mode {
     684            0 :                     "CREATEDB CREATEROLE NOLOGIN BYPASSRLS"
     685              :                 } else {
     686            0 :                     "CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS"
     687              :                 }
     688              :             ),
     689            0 :             comment: None,
     690              :         }))),
     691              :         // BEGIN_HADRON
     692              :         // New Hadron phase
     693              :         ApplySpecPhase::CreateDatabricksRoles => {
     694            0 :             let queries = create_databricks_roles();
     695            0 :             let operations = queries.into_iter().map(|query| Operation {
     696            0 :                 query,
     697            0 :                 comment: None,
     698            0 :             });
     699            0 :             Ok(Box::new(operations))
     700              :         }
     701              : 
     702              :         // Backfill existing databricks_reader_* roles with statement timeout from GUC
     703              :         ApplySpecPhase::AlterDatabricksRoles => {
     704            0 :             let query = String::from(include_str!(
     705              :                 "sql/alter_databricks_reader_roles_timeout.sql"
     706              :             ));
     707              : 
     708            0 :             let operations = once(Operation {
     709            0 :                 query,
     710            0 :                 comment: Some(
     711            0 :                     "Backfill existing databricks_reader_* roles with statement timeout"
     712            0 :                         .to_string(),
     713            0 :                 ),
     714            0 :             });
     715              : 
     716            0 :             Ok(Box::new(operations))
     717              :         }
     718              :         // End of new Hadron Phase
     719              :         // END_HADRON
     720              :         ApplySpecPhase::DropInvalidDatabases => {
     721            0 :             let mut ctx = ctx.write().await;
     722            0 :             let databases = &mut ctx.dbs;
     723              : 
     724            0 :             let keys: Vec<_> = databases
     725            0 :                 .iter()
     726            0 :                 .filter(|(_, db)| db.invalid)
     727            0 :                 .map(|(dbname, _)| dbname.clone())
     728            0 :                 .collect();
     729              : 
     730              :             // After recent commit in Postgres, interrupted DROP DATABASE
     731              :             // leaves the database in the invalid state. According to the
     732              :             // commit message, the only option for user is to drop it again.
     733              :             // See:
     734              :             //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     735              :             //
     736              :             // Postgres Neon extension is done the way, that db is de-registered
     737              :             // in the control plane metadata only after it is dropped. So there is
     738              :             // a chance that it still thinks that the db should exist. This means
     739              :             // that it will be re-created by the `CreateDatabases` phase. This
     740              :             // is fine, as user can just drop the table again (in vanilla
     741              :             // Postgres they would need to do the same).
     742            0 :             let operations = keys
     743            0 :                 .into_iter()
     744            0 :                 .filter_map(move |dbname| ctx.dbs.remove(&dbname))
     745            0 :                 .map(|db| Operation {
     746            0 :                     query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
     747            0 :                     comment: Some(format!("Dropping invalid database {}", db.name)),
     748            0 :                 });
     749              : 
     750            0 :             Ok(Box::new(operations))
     751              :         }
     752              :         ApplySpecPhase::RenameRoles => {
     753            0 :             let mut ctx = ctx.write().await;
     754              : 
     755            0 :             let operations = spec
     756            0 :                 .delta_operations
     757            0 :                 .iter()
     758            0 :                 .flatten()
     759            0 :                 .filter(|op| op.action == "rename_role")
     760            0 :                 .filter_map(move |op| {
     761            0 :                     let roles = &mut ctx.roles;
     762              : 
     763            0 :                     if roles.contains_key(op.name.as_str()) {
     764            0 :                         None
     765              :                     } else {
     766            0 :                         let new_name = op.new_name.as_ref().unwrap();
     767            0 :                         let mut role = roles.remove(op.name.as_str()).unwrap();
     768              : 
     769            0 :                         role.name = new_name.clone();
     770            0 :                         role.encrypted_password = None;
     771            0 :                         roles.insert(role.name.clone(), role);
     772              : 
     773            0 :                         Some(Operation {
     774            0 :                             query: format!(
     775            0 :                                 "ALTER ROLE {} RENAME TO {}",
     776            0 :                                 op.name.pg_quote(),
     777            0 :                                 new_name.pg_quote()
     778            0 :                             ),
     779            0 :                             comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
     780            0 :                         })
     781              :                     }
     782            0 :                 });
     783              : 
     784            0 :             Ok(Box::new(operations))
     785              :         }
     786              :         ApplySpecPhase::CreateAndAlterRoles => {
     787            0 :             let mut ctx = ctx.write().await;
     788              : 
     789            0 :             let operations = spec.cluster.roles
     790            0 :                 .iter()
     791            0 :                 .filter_map(move |role| {
     792            0 :                     let roles = &mut ctx.roles;
     793            0 :                     let db_role = roles.get(&role.name);
     794              : 
     795            0 :                     match db_role {
     796            0 :                         Some(db_role) => {
     797            0 :                             if db_role.encrypted_password != role.encrypted_password {
     798              :                                 // This can be run on /every/ role! Not just ones created through the console.
     799              :                                 // This means that if you add some funny ALTER here that adds a permission,
     800              :                                 // this will get run even on user-created roles! This will result in different
     801              :                                 // behavior before and after a spec gets reapplied. The below ALTER as it stands
     802              :                                 // now only grants LOGIN and changes the password. Please do not allow this branch
     803              :                                 // to do anything silly.
     804            0 :                                 Some(Operation {
     805            0 :                                     query: format!(
     806            0 :                                         "ALTER ROLE {} {}",
     807            0 :                                         role.name.pg_quote(),
     808            0 :                                         role.to_pg_options(),
     809            0 :                                     ),
     810            0 :                                     comment: None,
     811            0 :                                 })
     812              :                             } else {
     813            0 :                                 None
     814              :                             }
     815              :                         }
     816              :                         None => {
     817            0 :                             let query = if !jwks_roles.contains(role.name.as_str()) {
     818            0 :                                 format!(
     819            0 :                                     "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE {} {}",
     820            0 :                                     role.name.pg_quote(),
     821              :                                     params.privileged_role_name,
     822            0 :                                     role.to_pg_options(),
     823              :                                 )
     824              :                             } else {
     825            0 :                                 format!(
     826            0 :                                     "CREATE ROLE {} {}",
     827            0 :                                     role.name.pg_quote(),
     828            0 :                                     role.to_pg_options(),
     829              :                                 )
     830              :                             };
     831            0 :                             Some(Operation {
     832            0 :                                 query,
     833            0 :                                 comment: Some(format!("creating role {}", role.name)),
     834            0 :                             })
     835              :                         }
     836              :                     }
     837            0 :                 });
     838              : 
     839            0 :             Ok(Box::new(operations))
     840              :         }
     841              :         ApplySpecPhase::RenameAndDeleteDatabases => {
     842            0 :             let mut ctx = ctx.write().await;
     843              : 
     844            0 :             let operations = spec
     845            0 :                 .delta_operations
     846            0 :                 .iter()
     847            0 :                 .flatten()
     848            0 :                 .filter_map(move |op| {
     849            0 :                     let databases = &mut ctx.dbs;
     850            0 :                     match op.action.as_str() {
     851              :                         // We do not check whether the DB exists or not,
     852              :                         // Postgres will take care of it for us
     853            0 :                         "delete_db" => {
     854            0 :                             let (db_name, outer_tag) = op.name.pg_quote_dollar();
     855              :                             // In Postgres we can't drop a database if it is a template.
     856              :                             // So we need to unset the template flag first, but it could
     857              :                             // be a retry, so we could've already dropped the database.
     858              :                             // Check that database exists first to make it idempotent.
     859            0 :                             let unset_template_query: String = format!(
     860            0 :                                 include_str!("sql/unset_template_for_drop_dbs.sql"),
     861              :                                 datname = db_name,
     862              :                                 outer_tag = outer_tag,
     863              :                             );
     864              : 
     865              :                             // Use FORCE to drop database even if there are active connections.
     866              :                             // We run this from `cloud_admin`, so it should have enough privileges.
     867              :                             //
     868              :                             // NB: there could be other db states, which prevent us from dropping
     869              :                             // the database. For example, if db is used by any active subscription
     870              :                             // or replication slot.
     871              :                             // Such cases are handled in the DropLogicalSubscriptions
     872              :                             // phase. We do all the cleanup before actually dropping the database.
     873            0 :                             let drop_db_query: String = format!(
     874            0 :                                 "DROP DATABASE IF EXISTS {} WITH (FORCE)",
     875            0 :                                 &op.name.pg_quote()
     876              :                             );
     877              : 
     878            0 :                             databases.remove(&op.name);
     879              : 
     880            0 :                             Some(vec![
     881            0 :                                 Operation {
     882            0 :                                     query: unset_template_query,
     883            0 :                                     comment: Some(format!(
     884            0 :                                         "optionally clearing template flags for DB {}",
     885            0 :                                         op.name,
     886            0 :                                     )),
     887            0 :                                 },
     888            0 :                                 Operation {
     889            0 :                                     query: drop_db_query,
     890            0 :                                     comment: Some(format!("deleting database {}", op.name,)),
     891            0 :                                 },
     892            0 :                             ])
     893              :                         }
     894            0 :                         "rename_db" => {
     895            0 :                             if let Some(mut db) = databases.remove(&op.name) {
     896              :                                 // update state of known databases
     897            0 :                                 let new_name = op.new_name.as_ref().unwrap();
     898            0 :                                 db.name = new_name.clone();
     899            0 :                                 databases.insert(db.name.clone(), db);
     900              : 
     901            0 :                                 Some(vec![Operation {
     902            0 :                                     query: format!(
     903            0 :                                         "ALTER DATABASE {} RENAME TO {}",
     904            0 :                                         op.name.pg_quote(),
     905            0 :                                         new_name.pg_quote(),
     906            0 :                                     ),
     907            0 :                                     comment: Some(format!(
     908            0 :                                         "renaming database '{}' to '{}'",
     909            0 :                                         op.name, new_name
     910            0 :                                     )),
     911            0 :                                 }])
     912              :                             } else {
     913            0 :                                 None
     914              :                             }
     915              :                         }
     916            0 :                         _ => None,
     917              :                     }
     918            0 :                 })
     919            0 :                 .flatten();
     920              : 
     921            0 :             Ok(Box::new(operations))
     922              :         }
     923              :         ApplySpecPhase::CreateAndAlterDatabases => {
     924            0 :             let mut ctx = ctx.write().await;
     925              : 
     926            0 :             let operations = spec
     927            0 :                 .cluster
     928            0 :                 .databases
     929            0 :                 .iter()
     930            0 :                 .filter_map(move |db| {
     931            0 :                     let databases = &mut ctx.dbs;
     932            0 :                     if let Some(edb) = databases.get_mut(&db.name) {
     933            0 :                         let change_owner = if edb.owner.starts_with('"') {
     934            0 :                             db.owner.pg_quote() != edb.owner
     935              :                         } else {
     936            0 :                             db.owner != edb.owner
     937              :                         };
     938              : 
     939            0 :                         edb.owner = db.owner.clone();
     940              : 
     941            0 :                         if change_owner {
     942            0 :                             Some(vec![Operation {
     943            0 :                                 query: format!(
     944            0 :                                     "ALTER DATABASE {} OWNER TO {}",
     945            0 :                                     db.name.pg_quote(),
     946            0 :                                     db.owner.pg_quote()
     947            0 :                                 ),
     948            0 :                                 comment: Some(format!(
     949            0 :                                     "changing database owner of database {} to {}",
     950            0 :                                     db.name, db.owner
     951            0 :                                 )),
     952            0 :                             }])
     953              :                         } else {
     954            0 :                             None
     955              :                         }
     956              :                     } else {
     957            0 :                         databases.insert(db.name.clone(), db.clone());
     958              : 
     959            0 :                         Some(vec![
     960            0 :                             Operation {
     961            0 :                                 query: format!(
     962            0 :                                     "CREATE DATABASE {} {}",
     963            0 :                                     db.name.pg_quote(),
     964            0 :                                     db.to_pg_options(),
     965            0 :                                 ),
     966            0 :                                 comment: None,
     967            0 :                             },
     968            0 :                             Operation {
     969            0 :                                 // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
     970            0 :                                 // (see https://www.postgresql.org/docs/current/ddl-priv.html)
     971            0 :                                 query: format!(
     972            0 :                                     "GRANT ALL PRIVILEGES ON DATABASE {} TO {}",
     973            0 :                                     db.name.pg_quote(),
     974            0 :                                     params.privileged_role_name
     975            0 :                                 ),
     976            0 :                                 comment: None,
     977            0 :                             },
     978            0 :                         ])
     979              :                     }
     980            0 :                 })
     981            0 :                 .flatten();
     982              : 
     983            0 :             Ok(Box::new(operations))
     984              :         }
     985            0 :         ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
     986            0 :             query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
     987            0 :             comment: Some(String::from(
     988            0 :                 "create schema for neon extension and utils tables",
     989            0 :             )),
     990            0 :         }))),
     991            0 :         ApplySpecPhase::RunInEachDatabase { db, subphase } => {
     992              :             // Do some checks that user DB exists and we can access it.
     993              :             //
     994              :             // During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
     995              :             // which happen before dropping the DB, the current run could be a retry,
     996              :             // so it's a valid case when DB is absent already. The case of
     997              :             // `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
     998              :             // in theory user can have some dangling objects there, so we will fail at
     999              :             // the actual drop later. Yet, to fix that in the current code we would need
    1000              :             // to ALTER DATABASE, and then check back, but that even more invasive, so
    1001              :             // that's not what we really want to do here.
    1002              :             //
    1003              :             // For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
    1004            0 :             if let DB::UserDB(db) = db {
    1005            0 :                 let databases = &ctx.read().await.dbs;
    1006              : 
    1007            0 :                 let edb = match databases.get(&db.name) {
    1008            0 :                     Some(edb) => edb,
    1009              :                     None => {
    1010            0 :                         warn!(
    1011            0 :                             "skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL",
    1012              :                             subphase, db.name
    1013              :                         );
    1014            0 :                         return Ok(Box::new(empty()));
    1015              :                     }
    1016              :                 };
    1017              : 
    1018            0 :                 if edb.restrict_conn || edb.invalid {
    1019            0 :                     warn!(
    1020            0 :                         "skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
    1021              :                         subphase, db.name, edb.restrict_conn, edb.invalid
    1022              :                     );
    1023            0 :                     return Ok(Box::new(empty()));
    1024            0 :                 }
    1025            0 :             }
    1026              : 
    1027            0 :             match subphase {
    1028              :                 PerDatabasePhase::DropLogicalSubscriptions => {
    1029            0 :                     match &db {
    1030            0 :                         DB::UserDB(db) => {
    1031            0 :                             let (db_name, outer_tag) = db.name.pg_quote_dollar();
    1032            0 :                             let drop_subscription_query: String = format!(
    1033            0 :                                 include_str!("sql/drop_subscriptions.sql"),
    1034              :                                 datname_str = db_name,
    1035              :                                 outer_tag = outer_tag,
    1036              :                             );
    1037              : 
    1038            0 :                             let operations = vec![Operation {
    1039            0 :                                 query: drop_subscription_query,
    1040            0 :                                 comment: Some(format!(
    1041            0 :                                     "optionally dropping subscriptions for DB {}",
    1042            0 :                                     db.name,
    1043            0 :                                 )),
    1044            0 :                             }]
    1045            0 :                             .into_iter();
    1046              : 
    1047            0 :                             Ok(Box::new(operations))
    1048              :                         }
    1049              :                         // skip this cleanup for the system databases
    1050              :                         // because users can't drop them
    1051            0 :                         DB::SystemDB => Ok(Box::new(empty())),
    1052              :                     }
    1053              :                 }
    1054              :                 PerDatabasePhase::DeleteDBRoleReferences => {
    1055            0 :                     let ctx = ctx.read().await;
    1056              : 
    1057            0 :                     let operations = spec
    1058            0 :                         .delta_operations
    1059            0 :                         .iter()
    1060            0 :                         .flatten()
    1061            0 :                         .filter(|op| op.action == "delete_role")
    1062            0 :                         .filter_map(move |op| {
    1063            0 :                             if db.is_owned_by(&op.name) {
    1064            0 :                                 return None;
    1065            0 :                             }
    1066            0 :                             if !ctx.roles.contains_key(&op.name) {
    1067            0 :                                 return None;
    1068            0 :                             }
    1069            0 :                             let quoted = op.name.pg_quote();
    1070            0 :                             let new_owner = match &db {
    1071            0 :                                 DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
    1072            0 :                                 DB::UserDB(db) => db.owner.pg_quote(),
    1073              :                             };
    1074            0 :                             let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
    1075              : 
    1076            0 :                             Some(vec![
    1077            0 :                                 // This will reassign all dependent objects to the db owner
    1078            0 :                                 Operation {
    1079            0 :                                     query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
    1080            0 :                                     comment: None,
    1081            0 :                                 },
    1082            0 :                                 // Revoke some potentially blocking privileges (Neon-specific currently)
    1083            0 :                                 Operation {
    1084            0 :                                     query: format!(
    1085            0 :                                         include_str!("sql/pre_drop_role_revoke_privileges.sql"),
    1086            0 :                                         // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
    1087            0 :                                         role_name = escaped_role,
    1088            0 :                                         outer_tag = outer_tag,
    1089            0 :                                     )
    1090            0 :                                     // HADRON change:
    1091            0 :                                     .replace("neon_superuser", &params.privileged_role_name),
    1092            0 :                                     // HADRON change end                                    ,
    1093            0 :                                     comment: None,
    1094            0 :                                 },
    1095            0 :                                 // This now will only drop privileges of the role
    1096            0 :                                 // TODO: this is obviously not 100% true because of the above case,
    1097            0 :                                 // there could be still some privileges that are not revoked. Maybe this
    1098            0 :                                 // only drops privileges that were granted *by this* role, not *to this* role,
    1099            0 :                                 // but this has to be checked.
    1100            0 :                                 Operation {
    1101            0 :                                     query: format!("DROP OWNED BY {quoted}"),
    1102            0 :                                     comment: None,
    1103            0 :                                 },
    1104            0 :                             ])
    1105            0 :                         })
    1106            0 :                         .flatten();
    1107              : 
    1108            0 :                     Ok(Box::new(operations))
    1109              :                 }
    1110              :                 PerDatabasePhase::ChangeSchemaPerms => {
    1111            0 :                     let db = match &db {
    1112              :                         // ignore schema permissions on the system database
    1113            0 :                         DB::SystemDB => return Ok(Box::new(empty())),
    1114            0 :                         DB::UserDB(db) => db,
    1115              :                     };
    1116            0 :                     let (db_owner, outer_tag) = db.owner.pg_quote_dollar();
    1117              : 
    1118            0 :                     let operations = vec![
    1119            0 :                         Operation {
    1120            0 :                             query: format!(
    1121            0 :                                 include_str!("sql/set_public_schema_owner.sql"),
    1122            0 :                                 db_owner = db_owner,
    1123            0 :                                 outer_tag = outer_tag,
    1124            0 :                             ),
    1125            0 :                             comment: None,
    1126            0 :                         },
    1127            0 :                         Operation {
    1128            0 :                             query: String::from(include_str!("sql/default_grants.sql"))
    1129            0 :                                 .replace("neon_superuser", &params.privileged_role_name),
    1130            0 :                             comment: None,
    1131            0 :                         },
    1132              :                     ]
    1133            0 :                     .into_iter();
    1134              : 
    1135            0 :                     Ok(Box::new(operations))
    1136              :                 }
    1137              :             }
    1138              :         }
    1139              :         // Interestingly, we only install p_s_s in the main database, even when
    1140              :         // it's preloaded.
    1141              :         ApplySpecPhase::HandleOtherExtensions => {
    1142            0 :             if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
    1143            0 :                 if libs.contains("pg_stat_statements") {
    1144            0 :                     return Ok(Box::new(once(Operation {
    1145            0 :                         query: String::from(
    1146            0 :                             "CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
    1147            0 :                         ),
    1148            0 :                         comment: Some(String::from("create system extensions")),
    1149            0 :                     })));
    1150            0 :                 }
    1151            0 :             }
    1152            0 :             Ok(Box::new(empty()))
    1153              :         }
    1154            0 :         ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
    1155            0 :             query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
    1156            0 :             comment: Some(String::from("create pgaudit extensions")),
    1157            0 :         }))),
    1158            0 :         ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
    1159            0 :             query: String::from(
    1160            0 :                 "CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
    1161            0 :             ),
    1162            0 :             comment: Some(String::from("create pgauditlogtofile extensions")),
    1163            0 :         }))),
    1164              :         // Disable pgaudit logging for postgres database.
    1165              :         // Postgres is neon system database used by monitors
    1166              :         // and compute_ctl tuning functions and thus generates a lot of noise.
    1167              :         // We do not consider data stored in this database as sensitive.
    1168              :         ApplySpecPhase::DisablePostgresDBPgAudit => {
    1169            0 :             let query = "ALTER DATABASE postgres SET pgaudit.log to 'none'";
    1170            0 :             Ok(Box::new(once(Operation {
    1171            0 :                 query: query.to_string(),
    1172            0 :                 comment: Some(query.to_string()),
    1173            0 :             })))
    1174              :         }
    1175              :         ApplySpecPhase::HandleNeonExtension => {
    1176            0 :             let operations = vec![
    1177            0 :                 Operation {
    1178            0 :                     query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
    1179            0 :                     comment: Some(String::from(
    1180            0 :                         "init: install the extension if not already installed",
    1181            0 :                     )),
    1182            0 :                 },
    1183            0 :                 Operation {
    1184            0 :                     query: String::from(
    1185            0 :                         "UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
    1186            0 :                     ),
    1187            0 :                     comment: Some(String::from("compat/fix: make neon relocatable")),
    1188            0 :                 },
    1189            0 :                 Operation {
    1190            0 :                     query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
    1191            0 :                     comment: Some(String::from("compat/fix: alter neon extension schema")),
    1192            0 :                 },
    1193            0 :                 Operation {
    1194            0 :                     query: String::from("ALTER EXTENSION neon UPDATE"),
    1195            0 :                     comment: Some(String::from("compat/update: update neon extension version")),
    1196            0 :                 },
    1197              :             ]
    1198            0 :             .into_iter();
    1199              : 
    1200            0 :             Ok(Box::new(operations))
    1201              :         }
    1202              :         // BEGIN_HADRON
    1203              :         // Note: we may want to version the extension someday, but for now we just drop it and recreate it.
    1204              :         ApplySpecPhase::HandleDatabricksAuthExtension => {
    1205            0 :             let operations = vec![
    1206            0 :                 Operation {
    1207            0 :                     query: String::from("DROP EXTENSION IF EXISTS databricks_auth"),
    1208            0 :                     comment: Some(String::from("dropping existing databricks_auth extension")),
    1209            0 :                 },
    1210            0 :                 Operation {
    1211            0 :                     query: String::from("CREATE EXTENSION databricks_auth"),
    1212            0 :                     comment: Some(String::from("creating databricks_auth extension")),
    1213            0 :                 },
    1214            0 :                 Operation {
    1215            0 :                     query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"),
    1216            0 :                     comment: Some(String::from("grant select on databricks auth counters")),
    1217            0 :                 },
    1218              :             ]
    1219            0 :             .into_iter();
    1220              : 
    1221            0 :             Ok(Box::new(operations))
    1222              :         }
    1223              :         // END_HADRON
    1224            0 :         ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
    1225            0 :             query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
    1226            0 :             comment: None,
    1227            0 :         }))),
    1228              :         ApplySpecPhase::DropRoles => {
    1229            0 :             let operations = spec
    1230            0 :                 .delta_operations
    1231            0 :                 .iter()
    1232            0 :                 .flatten()
    1233            0 :                 .filter(|op| op.action == "delete_role")
    1234            0 :                 .map(|op| Operation {
    1235            0 :                     query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
    1236            0 :                     comment: None,
    1237            0 :                 });
    1238              : 
    1239            0 :             Ok(Box::new(operations))
    1240              :         }
    1241              : 
    1242              :         // BEGIN_HADRON
    1243              :         // New Hadron phases
    1244              :         //
    1245              :         // Grants permissions to roles that are used by Databricks.
    1246              :         ApplySpecPhase::AddDatabricksGrants => {
    1247            0 :             let operations = vec![
    1248            0 :                 Operation {
    1249            0 :                     query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"),
    1250            0 :                     comment: Some(String::from(
    1251            0 :                         "Permissions needed to execute neon.* functions (in the postgres database)",
    1252            0 :                     )),
    1253            0 :                 },
    1254            0 :                 Operation {
    1255            0 :                     query: String::from(
    1256            0 :                         "GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor",
    1257            0 :                     ),
    1258            0 :                     comment: Some(String::from("Permissions needed for read and write probes")),
    1259            0 :                 },
    1260            0 :                 Operation {
    1261            0 :                     query: String::from(
    1262            0 :                         "GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor",
    1263            0 :                     ),
    1264            0 :                     comment: Some(String::from(
    1265            0 :                         "Permissions needed to monitor .snap file counts",
    1266            0 :                     )),
    1267            0 :                 },
    1268            0 :                 Operation {
    1269            0 :                     query: String::from(
    1270            0 :                         "GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor",
    1271            0 :                     ),
    1272            0 :                     comment: Some(String::from(
    1273            0 :                         "Permissions needed to access neon performance counters view",
    1274            0 :                     )),
    1275            0 :                 },
    1276            0 :                 Operation {
    1277            0 :                     query: String::from(
    1278            0 :                         "GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor",
    1279            0 :                     ),
    1280            0 :                     comment: Some(String::from(
    1281            0 :                         "Permissions needed to execute the underlying performance counters function",
    1282            0 :                     )),
    1283            0 :                 },
    1284              :             ]
    1285            0 :             .into_iter();
    1286              : 
    1287            0 :             Ok(Box::new(operations))
    1288              :         }
    1289              :         // Creates minor objects that are used by Databricks.
    1290            0 :         ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation {
    1291            0 :             query: String::from(include_str!("sql/create_databricks_misc.sql")),
    1292            0 :             comment: Some(String::from(
    1293            0 :                 "The function databricks_monitor uses to convert exception to 0 or 1",
    1294            0 :             )),
    1295            0 :         }))),
    1296              :         // End of new Hadron phases
    1297              :         // END_HADRON
    1298            0 :         ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
    1299            0 :             query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
    1300            0 :             comment: None,
    1301            0 :         }))),
    1302              :     }
    1303            0 : }
        

Generated by: LCOV version 2.1-beta