Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::{empty, once};
5 : use std::sync::Arc;
6 :
7 : use anyhow::{Context, Result};
8 : use compute_api::responses::ComputeStatus;
9 : use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role};
10 : use futures::future::join_all;
11 : use tokio::sync::RwLock;
12 : use tokio_postgres::Client;
13 : use tokio_postgres::error::SqlState;
14 : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
15 :
16 : use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState, create_databricks_roles};
17 : use crate::hadron_metrics::COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS;
18 : use crate::pg_helpers::{
19 : DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
20 : get_existing_roles_async,
21 : };
22 : use crate::spec_apply::ApplySpecPhase::{
23 : AddDatabricksGrants, AlterDatabricksRoles, CreateAndAlterDatabases, CreateAndAlterRoles,
24 : CreateAvailabilityCheck, CreateDatabricksMisc, CreateDatabricksRoles, CreatePgauditExtension,
25 : CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
26 : DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
27 : HandleDatabricksAuthExtension, HandleNeonExtension, HandleOtherExtensions,
28 : RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
29 : };
30 : use crate::spec_apply::PerDatabasePhase::{
31 : ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions,
32 : };
33 :
34 : impl ComputeNode {
35 : /// Apply the spec to the running PostgreSQL instance.
36 : /// The caller can decide to run with multiple clients in parallel, or
37 : /// single mode. Either way, the commands executed will be the same, and
38 : /// only commands run in different databases are parallelized.
39 : #[instrument(skip_all)]
40 : pub fn apply_spec_sql(
41 : &self,
42 : spec: Arc<ComputeSpec>,
43 : conf: Arc<tokio_postgres::Config>,
44 : concurrency: usize,
45 : ) -> Result<()> {
46 : info!("Applying config with max {} concurrency", concurrency);
47 : debug!("Config: {:?}", spec);
48 :
49 : let rt = tokio::runtime::Handle::current();
50 0 : rt.block_on(async {
51 : // Proceed with post-startup configuration. Note, that order of operations is important.
52 0 : let client = Self::get_maintenance_client(&conf).await?;
53 0 : let spec = spec.clone();
54 0 : let params = Arc::new(self.params.clone());
55 :
56 0 : let databases = get_existing_dbs_async(&client).await?;
57 0 : let roles = get_existing_roles_async(&client)
58 0 : .await?
59 0 : .into_iter()
60 0 : .map(|role| (role.name.clone(), role))
61 0 : .collect::<HashMap<String, Role>>();
62 :
63 : // Check if we need to drop subscriptions before starting the endpoint.
64 : //
65 : // It is important to do this operation exactly once when endpoint starts on a new branch.
66 : // Otherwise, we may drop not inherited, but newly created subscriptions.
67 : //
68 : // We cannot rely only on spec.drop_subscriptions_before_start flag,
69 : // because if for some reason compute restarts inside VM,
70 : // it will start again with the same spec and flag value.
71 : //
72 : // To handle this, we save the fact of the operation in the database
73 : // in the neon.drop_subscriptions_done table.
74 : // If the table does not exist, we assume that the operation was never performed, so we must do it.
75 : // If table exists, we check if the operation was performed on the current timelilne.
76 : //
77 0 : let mut drop_subscriptions_done = false;
78 :
79 0 : if spec.drop_subscriptions_before_start {
80 0 : let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
81 :
82 0 : info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
83 :
84 : drop_subscriptions_done = match
85 0 : client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
86 0 : Ok(result) => !result.is_empty(),
87 0 : Err(e) =>
88 : {
89 0 : match e.code() {
90 0 : Some(&SqlState::UNDEFINED_TABLE) => false,
91 : _ => {
92 : // We don't expect any other error here, except for the schema/table not existing
93 0 : error!("Error checking if drop subscription operation was already performed: {}", e);
94 0 : return Err(e.into());
95 : }
96 : }
97 : }
98 : }
99 0 : };
100 :
101 :
102 0 : let jwks_roles = Arc::new(
103 0 : spec.as_ref()
104 0 : .local_proxy_config
105 0 : .iter()
106 0 : .flat_map(|it| &it.jwks)
107 0 : .flatten()
108 0 : .flat_map(|setting| &setting.role_names)
109 0 : .cloned()
110 0 : .collect::<HashSet<_>>(),
111 : );
112 :
113 0 : let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
114 0 : roles,
115 0 : dbs: databases,
116 0 : }));
117 :
118 : // Apply special pre drop database phase.
119 : // NOTE: we use the code of RunInEachDatabase phase for parallelism
120 : // and connection management, but we don't really run it in *each* database,
121 : // only in databases, we're about to drop.
122 0 : info!("Applying PerDatabase (pre-dropdb) phase");
123 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
124 :
125 : // Run the phase for each database that we're about to drop.
126 0 : let db_processes = spec
127 0 : .delta_operations
128 0 : .iter()
129 0 : .flatten()
130 0 : .filter_map(move |op| {
131 0 : if op.action.as_str() == "delete_db" {
132 0 : Some(op.name.clone())
133 : } else {
134 0 : None
135 : }
136 0 : })
137 0 : .map(|dbname| {
138 0 : let spec = spec.clone();
139 0 : let ctx = ctx.clone();
140 0 : let jwks_roles = jwks_roles.clone();
141 0 : let mut conf = conf.as_ref().clone();
142 0 : let concurrency_token = concurrency_token.clone();
143 : // We only need dbname field for this phase, so set other fields to dummy values
144 0 : let db = DB::UserDB(Database {
145 0 : name: dbname.clone(),
146 0 : owner: "cloud_admin".to_string(),
147 0 : options: None,
148 0 : restrict_conn: false,
149 0 : invalid: false,
150 0 : });
151 :
152 0 : debug!("Applying per-database phases for Database {:?}", &db);
153 :
154 0 : match &db {
155 0 : DB::SystemDB => {}
156 0 : DB::UserDB(db) => {
157 0 : conf.dbname(db.name.as_str());
158 0 : }
159 : }
160 :
161 0 : let conf = Arc::new(conf);
162 0 : let fut = Self::apply_spec_sql_db(
163 0 : params.clone(),
164 0 : spec.clone(),
165 0 : conf,
166 0 : ctx.clone(),
167 0 : jwks_roles.clone(),
168 0 : concurrency_token.clone(),
169 0 : db,
170 0 : [DropLogicalSubscriptions].to_vec(),
171 0 : self.params.lakebase_mode,
172 : );
173 :
174 0 : Ok(tokio::spawn(fut))
175 0 : })
176 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
177 :
178 0 : for process in db_processes.into_iter() {
179 0 : let handle = process?;
180 0 : if let Err(e) = handle.await? {
181 : // Handle the error case where the database does not exist
182 : // We do not check whether the DB exists or not in the deletion phase,
183 : // so we shouldn't be strict about it in pre-deletion cleanup as well.
184 0 : if e.to_string().contains("does not exist") {
185 0 : warn!("Error dropping subscription: {}", e);
186 : } else {
187 0 : return Err(e);
188 : }
189 0 : };
190 : }
191 :
192 0 : let phases = if self.params.lakebase_mode {
193 0 : vec![
194 0 : CreatePrivilegedRole,
195 : // BEGIN_HADRON
196 0 : CreateDatabricksRoles,
197 0 : AlterDatabricksRoles,
198 : // END_HADRON
199 0 : DropInvalidDatabases,
200 0 : RenameRoles,
201 0 : CreateAndAlterRoles,
202 0 : RenameAndDeleteDatabases,
203 0 : CreateAndAlterDatabases,
204 0 : CreateSchemaNeon,
205 : ]
206 : } else {
207 0 : vec![
208 0 : CreatePrivilegedRole,
209 0 : DropInvalidDatabases,
210 0 : RenameRoles,
211 0 : CreateAndAlterRoles,
212 0 : RenameAndDeleteDatabases,
213 0 : CreateAndAlterDatabases,
214 0 : CreateSchemaNeon,
215 : ]
216 : };
217 :
218 0 : for phase in phases {
219 0 : info!("Applying phase {:?}", &phase);
220 0 : apply_operations(
221 0 : params.clone(),
222 0 : spec.clone(),
223 0 : ctx.clone(),
224 0 : jwks_roles.clone(),
225 0 : phase,
226 0 : || async { Ok(&client) },
227 0 : self.params.lakebase_mode,
228 : )
229 0 : .await?;
230 : }
231 :
232 0 : info!("Applying RunInEachDatabase2 phase");
233 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
234 :
235 0 : let db_processes = spec
236 0 : .cluster
237 0 : .databases
238 0 : .iter()
239 0 : .map(|db| DB::new(db.clone()))
240 : // include
241 0 : .chain(once(DB::SystemDB))
242 0 : .map(|db| {
243 0 : let spec = spec.clone();
244 0 : let ctx = ctx.clone();
245 0 : let jwks_roles = jwks_roles.clone();
246 0 : let mut conf = conf.as_ref().clone();
247 0 : let concurrency_token = concurrency_token.clone();
248 0 : let db = db.clone();
249 :
250 0 : debug!("Applying per-database phases for Database {:?}", &db);
251 :
252 0 : match &db {
253 0 : DB::SystemDB => {}
254 0 : DB::UserDB(db) => {
255 0 : conf.dbname(db.name.as_str());
256 0 : }
257 : }
258 :
259 0 : let conf = Arc::new(conf);
260 0 : let mut phases = vec![
261 0 : DeleteDBRoleReferences,
262 0 : ChangeSchemaPerms,
263 : ];
264 :
265 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
266 0 : info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
267 0 : phases.push(DropLogicalSubscriptions);
268 0 : }
269 :
270 0 : let fut = Self::apply_spec_sql_db(
271 0 : params.clone(),
272 0 : spec.clone(),
273 0 : conf,
274 0 : ctx.clone(),
275 0 : jwks_roles.clone(),
276 0 : concurrency_token.clone(),
277 0 : db,
278 0 : phases,
279 0 : self.params.lakebase_mode,
280 : );
281 :
282 0 : Ok(tokio::spawn(fut))
283 0 : })
284 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
285 :
286 0 : for process in db_processes.into_iter() {
287 0 : let handle = process?;
288 0 : handle.await??;
289 : }
290 :
291 0 : let mut phases = if self.params.lakebase_mode {
292 0 : vec![
293 0 : HandleOtherExtensions,
294 0 : HandleNeonExtension, // This step depends on CreateSchemaNeon
295 : // BEGIN_HADRON
296 0 : HandleDatabricksAuthExtension,
297 : // END_HADRON
298 0 : CreateAvailabilityCheck,
299 0 : DropRoles,
300 : // BEGIN_HADRON
301 0 : AddDatabricksGrants,
302 0 : CreateDatabricksMisc,
303 : // END_HADRON
304 : ]
305 : } else {
306 0 : vec![
307 0 : HandleOtherExtensions,
308 0 : HandleNeonExtension, // This step depends on CreateSchemaNeon
309 0 : CreateAvailabilityCheck,
310 0 : DropRoles,
311 : ]
312 : };
313 :
314 : // This step depends on CreateSchemaNeon
315 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
316 0 : info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
317 0 : phases.push(FinalizeDropLogicalSubscriptions);
318 0 : }
319 :
320 : // Keep DisablePostgresDBPgAudit phase at the end,
321 : // so that all config operations are audit logged.
322 0 : match spec.audit_log_level
323 : {
324 0 : ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
325 0 : phases.push(CreatePgauditExtension);
326 0 : phases.push(CreatePgauditlogtofileExtension);
327 0 : phases.push(DisablePostgresDBPgAudit);
328 0 : }
329 0 : ComputeAudit::Log | ComputeAudit::Base => {
330 0 : phases.push(CreatePgauditExtension);
331 0 : phases.push(DisablePostgresDBPgAudit);
332 0 : }
333 0 : ComputeAudit::Disabled => {}
334 : }
335 :
336 0 : for phase in phases {
337 0 : debug!("Applying phase {:?}", &phase);
338 0 : apply_operations(
339 0 : params.clone(),
340 0 : spec.clone(),
341 0 : ctx.clone(),
342 0 : jwks_roles.clone(),
343 0 : phase,
344 0 : || async { Ok(&client) },
345 0 : self.params.lakebase_mode,
346 : )
347 0 : .await?;
348 : }
349 :
350 0 : Ok::<(), anyhow::Error>(())
351 0 : })?;
352 :
353 : Ok(())
354 : }
355 :
356 : /// Apply SQL migrations of the RunInEachDatabase phase.
357 : ///
358 : /// May opt to not connect to databases that don't have any scheduled
359 : /// operations. The function is concurrency-controlled with the provided
360 : /// semaphore. The caller has to make sure the semaphore isn't exhausted.
361 : #[allow(clippy::too_many_arguments)] // TODO: needs bigger refactoring
362 0 : async fn apply_spec_sql_db(
363 0 : params: Arc<ComputeNodeParams>,
364 0 : spec: Arc<ComputeSpec>,
365 0 : conf: Arc<tokio_postgres::Config>,
366 0 : ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
367 0 : jwks_roles: Arc<HashSet<String>>,
368 0 : concurrency_token: Arc<tokio::sync::Semaphore>,
369 0 : db: DB,
370 0 : subphases: Vec<PerDatabasePhase>,
371 0 : lakebase_mode: bool,
372 0 : ) -> Result<()> {
373 0 : let _permit = concurrency_token.acquire().await?;
374 :
375 0 : let mut client_conn = None;
376 :
377 0 : for subphase in subphases {
378 0 : apply_operations(
379 0 : params.clone(),
380 0 : spec.clone(),
381 0 : ctx.clone(),
382 0 : jwks_roles.clone(),
383 0 : RunInEachDatabase {
384 0 : db: db.clone(),
385 0 : subphase,
386 0 : },
387 : // Only connect if apply_operation actually wants a connection.
388 : // It's quite possible this database doesn't need any queries,
389 : // so by not connecting we save time and effort connecting to
390 : // that database.
391 0 : || async {
392 0 : if client_conn.is_none() {
393 0 : let db_client = Self::get_maintenance_client(&conf).await?;
394 0 : client_conn.replace(db_client);
395 0 : }
396 0 : let client = client_conn.as_ref().unwrap();
397 0 : Ok(client)
398 0 : },
399 0 : lakebase_mode,
400 : )
401 0 : .await?;
402 : }
403 :
404 0 : drop(client_conn);
405 :
406 0 : Ok::<(), anyhow::Error>(())
407 0 : }
408 :
409 : /// Choose how many concurrent connections to use for applying the spec changes.
410 0 : pub fn max_service_connections(
411 0 : &self,
412 0 : compute_state: &ComputeState,
413 0 : spec: &ComputeSpec,
414 0 : ) -> usize {
415 : // If the cluster is in Init state we don't have to deal with user connections,
416 : // and can thus use all `max_connections` connection slots. However, that's generally not
417 : // very efficient, so we generally still limit it to a smaller number.
418 0 : if compute_state.status == ComputeStatus::Init {
419 : // If the settings contain 'max_connections', use that as template
420 0 : if let Some(config) = spec.cluster.settings.find("max_connections") {
421 0 : config.parse::<usize>().ok()
422 : } else {
423 : // Otherwise, try to find the setting in the postgresql_conf string
424 0 : spec.cluster
425 0 : .postgresql_conf
426 0 : .iter()
427 0 : .flat_map(|conf| conf.split("\n"))
428 0 : .filter_map(|line| {
429 0 : if !line.contains("max_connections") {
430 0 : return None;
431 0 : }
432 :
433 0 : let (key, value) = line.split_once("=")?;
434 0 : let key = key
435 0 : .trim_start_matches(char::is_whitespace)
436 0 : .trim_end_matches(char::is_whitespace);
437 :
438 0 : let value = value
439 0 : .trim_start_matches(char::is_whitespace)
440 0 : .trim_end_matches(char::is_whitespace);
441 :
442 0 : if key != "max_connections" {
443 0 : return None;
444 0 : }
445 :
446 0 : value.parse::<usize>().ok()
447 0 : })
448 0 : .next()
449 : }
450 : // If max_connections is present, use at most 1/3rd of that.
451 : // When max_connections is lower than 30, try to use at least 10 connections, but
452 : // never more than max_connections.
453 0 : .map(|limit| match limit {
454 0 : 0..10 => limit,
455 0 : 10..30 => 10,
456 0 : 30..300 => limit / 3,
457 0 : 300.. => 100,
458 0 : })
459 : // If we didn't find max_connections, default to 10 concurrent connections.
460 0 : .unwrap_or(10)
461 : } else {
462 : // state == Running
463 : // Because the cluster is already in the Running state, we should assume users are
464 : // already connected to the cluster, and high concurrency could negatively
465 : // impact user connectivity. Therefore, we can limit concurrency to the number of
466 : // reserved superuser connections, which users wouldn't be able to use anyway.
467 0 : spec.cluster
468 0 : .settings
469 0 : .find("superuser_reserved_connections")
470 0 : .iter()
471 0 : .filter_map(|val| val.parse::<usize>().ok())
472 0 : .map(|val| if val > 1 { val - 1 } else { 1 })
473 0 : .next_back()
474 0 : .unwrap_or(3)
475 : }
476 0 : }
477 : }
478 :
479 : #[derive(Clone)]
480 : pub enum DB {
481 : SystemDB,
482 : UserDB(Database),
483 : }
484 :
485 : impl DB {
486 0 : pub fn new(db: Database) -> DB {
487 0 : Self::UserDB(db)
488 0 : }
489 :
490 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
491 0 : match self {
492 0 : DB::SystemDB => false,
493 0 : DB::UserDB(db) => &db.owner == role,
494 : }
495 0 : }
496 : }
497 :
498 : impl Debug for DB {
499 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
500 0 : match self {
501 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
502 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
503 : }
504 0 : }
505 : }
506 :
507 : #[derive(Copy, Clone, Debug)]
508 : pub enum PerDatabasePhase {
509 : DeleteDBRoleReferences,
510 : ChangeSchemaPerms,
511 : /// This is a shared phase, used for both i) dropping dangling LR subscriptions
512 : /// before dropping the DB, and ii) dropping all subscriptions after creating
513 : /// a fresh branch.
514 : /// N.B. we will skip all DBs that are not present in Postgres, invalid, or
515 : /// have `datallowconn = false` (`restrict_conn`).
516 : DropLogicalSubscriptions,
517 : }
518 :
519 : #[derive(Clone, Debug)]
520 : pub enum ApplySpecPhase {
521 : CreatePrivilegedRole,
522 : // BEGIN_HADRON
523 : CreateDatabricksRoles,
524 : AlterDatabricksRoles,
525 : // END_HADRON
526 : DropInvalidDatabases,
527 : RenameRoles,
528 : CreateAndAlterRoles,
529 : RenameAndDeleteDatabases,
530 : CreateAndAlterDatabases,
531 : CreateSchemaNeon,
532 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
533 : CreatePgauditExtension,
534 : CreatePgauditlogtofileExtension,
535 : DisablePostgresDBPgAudit,
536 : HandleOtherExtensions,
537 : HandleNeonExtension,
538 : // BEGIN_HADRON
539 : HandleDatabricksAuthExtension,
540 : // END_HADRON
541 : CreateAvailabilityCheck,
542 : // BEGIN_HADRON
543 : AddDatabricksGrants,
544 : CreateDatabricksMisc,
545 : // END_HADRON
546 : DropRoles,
547 : FinalizeDropLogicalSubscriptions,
548 : }
549 :
550 : pub struct Operation {
551 : pub query: String,
552 : pub comment: Option<String>,
553 : }
554 :
555 : pub struct MutableApplyContext {
556 : pub roles: HashMap<String, Role>,
557 : pub dbs: HashMap<String, Database>,
558 : }
559 :
560 : /// Apply the operations that belong to the given spec apply phase.
561 : ///
562 : /// Commands within a single phase are executed in order of Iterator yield.
563 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
564 : /// indicated by its `db` field, and can share a single client for all changes
565 : /// to that database.
566 : ///
567 : /// Notes:
568 : /// - Commands are pipelined, and thus may cause incomplete apply if one
569 : /// command of many fails.
570 : /// - Failing commands will fail the phase's apply step once the return value
571 : /// is processed.
572 : /// - No timeouts have (yet) been implemented.
573 : /// - The caller is responsible for limiting and/or applying concurrency.
574 0 : pub async fn apply_operations<'a, Fut, F>(
575 0 : params: Arc<ComputeNodeParams>,
576 0 : spec: Arc<ComputeSpec>,
577 0 : ctx: Arc<RwLock<MutableApplyContext>>,
578 0 : jwks_roles: Arc<HashSet<String>>,
579 0 : apply_spec_phase: ApplySpecPhase,
580 0 : client: F,
581 0 : lakebase_mode: bool,
582 0 : ) -> Result<()>
583 0 : where
584 0 : F: FnOnce() -> Fut,
585 0 : Fut: Future<Output = Result<&'a Client>>,
586 0 : {
587 0 : debug!("Starting phase {:?}", &apply_spec_phase);
588 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
589 0 : let span2 = span.clone();
590 0 : async move {
591 0 : debug!("Processing phase {:?}", &apply_spec_phase);
592 0 : let ctx = ctx;
593 :
594 0 : let mut ops = get_operations(¶ms, &spec, &ctx, &jwks_roles, &apply_spec_phase)
595 0 : .await?
596 0 : .peekable();
597 :
598 : // Return (and by doing so, skip requesting the PostgreSQL client) if
599 : // we don't have any operations scheduled.
600 0 : if ops.peek().is_none() {
601 0 : return Ok(());
602 0 : }
603 :
604 0 : let client = client().await?;
605 :
606 0 : debug!("Applying phase {:?}", &apply_spec_phase);
607 :
608 0 : let active_queries = ops
609 0 : .map(|op| {
610 0 : let Operation { comment, query } = op;
611 0 : let inspan = match comment {
612 0 : None => span.clone(),
613 0 : Some(comment) => info_span!("phase {}: {}", comment),
614 : };
615 :
616 0 : async {
617 0 : let query = query;
618 0 : let res = client.simple_query(&query).await;
619 0 : debug!(
620 0 : "{} {}",
621 0 : if res.is_ok() {
622 0 : "successfully executed"
623 : } else {
624 0 : "failed to execute"
625 : },
626 : query
627 : );
628 0 : if !lakebase_mode {
629 0 : return res;
630 0 : }
631 : // BEGIN HADRON
632 0 : if let Err(e) = res.as_ref() {
633 0 : if let Some(sql_state) = e.code() {
634 0 : if sql_state.code() == "57014" {
635 0 : // SQL State 57014 (ERRCODE_QUERY_CANCELED) is used for statement timeouts.
636 0 : // Increment the counter whenever a statement timeout occurs. Timeouts on
637 0 : // this configuration path can only occur due to PS connectivity problems that
638 0 : // Postgres failed to recover from.
639 0 : COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.inc();
640 0 : }
641 0 : }
642 0 : }
643 : // END HADRON
644 :
645 0 : res
646 0 : }
647 0 : .instrument(inspan)
648 0 : })
649 0 : .collect::<Vec<_>>();
650 :
651 0 : drop(ctx);
652 :
653 0 : for it in join_all(active_queries).await {
654 0 : drop(it?);
655 : }
656 :
657 0 : debug!("Completed phase {:?}", &apply_spec_phase);
658 :
659 0 : Ok(())
660 0 : }
661 0 : .instrument(span2)
662 0 : .await
663 0 : }
664 :
665 : /// Create a stream of operations to be executed for that phase of applying
666 : /// changes.
667 : ///
668 : /// In the future we may generate a single stream of changes and then
669 : /// sort/merge/batch execution, but for now this is a nice way to improve
670 : /// batching behavior of the commands.
671 0 : async fn get_operations<'a>(
672 0 : params: &'a ComputeNodeParams,
673 0 : spec: &'a ComputeSpec,
674 0 : ctx: &'a RwLock<MutableApplyContext>,
675 0 : jwks_roles: &'a HashSet<String>,
676 0 : apply_spec_phase: &'a ApplySpecPhase,
677 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
678 0 : match apply_spec_phase {
679 0 : ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
680 0 : query: format!(
681 0 : include_str!("sql/create_privileged_role.sql"),
682 : privileged_role_name = params.privileged_role_name,
683 0 : privileges = if params.lakebase_mode {
684 0 : "CREATEDB CREATEROLE NOLOGIN BYPASSRLS"
685 : } else {
686 0 : "CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS"
687 : }
688 : ),
689 0 : comment: None,
690 : }))),
691 : // BEGIN_HADRON
692 : // New Hadron phase
693 : ApplySpecPhase::CreateDatabricksRoles => {
694 0 : let queries = create_databricks_roles();
695 0 : let operations = queries.into_iter().map(|query| Operation {
696 0 : query,
697 0 : comment: None,
698 0 : });
699 0 : Ok(Box::new(operations))
700 : }
701 :
702 : // Backfill existing databricks_reader_* roles with statement timeout from GUC
703 : ApplySpecPhase::AlterDatabricksRoles => {
704 0 : let query = String::from(include_str!(
705 : "sql/alter_databricks_reader_roles_timeout.sql"
706 : ));
707 :
708 0 : let operations = once(Operation {
709 0 : query,
710 0 : comment: Some(
711 0 : "Backfill existing databricks_reader_* roles with statement timeout"
712 0 : .to_string(),
713 0 : ),
714 0 : });
715 :
716 0 : Ok(Box::new(operations))
717 : }
718 : // End of new Hadron Phase
719 : // END_HADRON
720 : ApplySpecPhase::DropInvalidDatabases => {
721 0 : let mut ctx = ctx.write().await;
722 0 : let databases = &mut ctx.dbs;
723 :
724 0 : let keys: Vec<_> = databases
725 0 : .iter()
726 0 : .filter(|(_, db)| db.invalid)
727 0 : .map(|(dbname, _)| dbname.clone())
728 0 : .collect();
729 :
730 : // After recent commit in Postgres, interrupted DROP DATABASE
731 : // leaves the database in the invalid state. According to the
732 : // commit message, the only option for user is to drop it again.
733 : // See:
734 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
735 : //
736 : // Postgres Neon extension is done the way, that db is de-registered
737 : // in the control plane metadata only after it is dropped. So there is
738 : // a chance that it still thinks that the db should exist. This means
739 : // that it will be re-created by the `CreateDatabases` phase. This
740 : // is fine, as user can just drop the table again (in vanilla
741 : // Postgres they would need to do the same).
742 0 : let operations = keys
743 0 : .into_iter()
744 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
745 0 : .map(|db| Operation {
746 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
747 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
748 0 : });
749 :
750 0 : Ok(Box::new(operations))
751 : }
752 : ApplySpecPhase::RenameRoles => {
753 0 : let mut ctx = ctx.write().await;
754 :
755 0 : let operations = spec
756 0 : .delta_operations
757 0 : .iter()
758 0 : .flatten()
759 0 : .filter(|op| op.action == "rename_role")
760 0 : .filter_map(move |op| {
761 0 : let roles = &mut ctx.roles;
762 :
763 0 : if roles.contains_key(op.name.as_str()) {
764 0 : None
765 : } else {
766 0 : let new_name = op.new_name.as_ref().unwrap();
767 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
768 :
769 0 : role.name = new_name.clone();
770 0 : role.encrypted_password = None;
771 0 : roles.insert(role.name.clone(), role);
772 :
773 0 : Some(Operation {
774 0 : query: format!(
775 0 : "ALTER ROLE {} RENAME TO {}",
776 0 : op.name.pg_quote(),
777 0 : new_name.pg_quote()
778 0 : ),
779 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
780 0 : })
781 : }
782 0 : });
783 :
784 0 : Ok(Box::new(operations))
785 : }
786 : ApplySpecPhase::CreateAndAlterRoles => {
787 0 : let mut ctx = ctx.write().await;
788 :
789 0 : let operations = spec.cluster.roles
790 0 : .iter()
791 0 : .filter_map(move |role| {
792 0 : let roles = &mut ctx.roles;
793 0 : let db_role = roles.get(&role.name);
794 :
795 0 : match db_role {
796 0 : Some(db_role) => {
797 0 : if db_role.encrypted_password != role.encrypted_password {
798 : // This can be run on /every/ role! Not just ones created through the console.
799 : // This means that if you add some funny ALTER here that adds a permission,
800 : // this will get run even on user-created roles! This will result in different
801 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
802 : // now only grants LOGIN and changes the password. Please do not allow this branch
803 : // to do anything silly.
804 0 : Some(Operation {
805 0 : query: format!(
806 0 : "ALTER ROLE {} {}",
807 0 : role.name.pg_quote(),
808 0 : role.to_pg_options(),
809 0 : ),
810 0 : comment: None,
811 0 : })
812 : } else {
813 0 : None
814 : }
815 : }
816 : None => {
817 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
818 0 : format!(
819 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE {} {}",
820 0 : role.name.pg_quote(),
821 : params.privileged_role_name,
822 0 : role.to_pg_options(),
823 : )
824 : } else {
825 0 : format!(
826 0 : "CREATE ROLE {} {}",
827 0 : role.name.pg_quote(),
828 0 : role.to_pg_options(),
829 : )
830 : };
831 0 : Some(Operation {
832 0 : query,
833 0 : comment: Some(format!("creating role {}", role.name)),
834 0 : })
835 : }
836 : }
837 0 : });
838 :
839 0 : Ok(Box::new(operations))
840 : }
841 : ApplySpecPhase::RenameAndDeleteDatabases => {
842 0 : let mut ctx = ctx.write().await;
843 :
844 0 : let operations = spec
845 0 : .delta_operations
846 0 : .iter()
847 0 : .flatten()
848 0 : .filter_map(move |op| {
849 0 : let databases = &mut ctx.dbs;
850 0 : match op.action.as_str() {
851 : // We do not check whether the DB exists or not,
852 : // Postgres will take care of it for us
853 0 : "delete_db" => {
854 0 : let (db_name, outer_tag) = op.name.pg_quote_dollar();
855 : // In Postgres we can't drop a database if it is a template.
856 : // So we need to unset the template flag first, but it could
857 : // be a retry, so we could've already dropped the database.
858 : // Check that database exists first to make it idempotent.
859 0 : let unset_template_query: String = format!(
860 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
861 : datname = db_name,
862 : outer_tag = outer_tag,
863 : );
864 :
865 : // Use FORCE to drop database even if there are active connections.
866 : // We run this from `cloud_admin`, so it should have enough privileges.
867 : //
868 : // NB: there could be other db states, which prevent us from dropping
869 : // the database. For example, if db is used by any active subscription
870 : // or replication slot.
871 : // Such cases are handled in the DropLogicalSubscriptions
872 : // phase. We do all the cleanup before actually dropping the database.
873 0 : let drop_db_query: String = format!(
874 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
875 0 : &op.name.pg_quote()
876 : );
877 :
878 0 : databases.remove(&op.name);
879 :
880 0 : Some(vec![
881 0 : Operation {
882 0 : query: unset_template_query,
883 0 : comment: Some(format!(
884 0 : "optionally clearing template flags for DB {}",
885 0 : op.name,
886 0 : )),
887 0 : },
888 0 : Operation {
889 0 : query: drop_db_query,
890 0 : comment: Some(format!("deleting database {}", op.name,)),
891 0 : },
892 0 : ])
893 : }
894 0 : "rename_db" => {
895 0 : if let Some(mut db) = databases.remove(&op.name) {
896 : // update state of known databases
897 0 : let new_name = op.new_name.as_ref().unwrap();
898 0 : db.name = new_name.clone();
899 0 : databases.insert(db.name.clone(), db);
900 :
901 0 : Some(vec![Operation {
902 0 : query: format!(
903 0 : "ALTER DATABASE {} RENAME TO {}",
904 0 : op.name.pg_quote(),
905 0 : new_name.pg_quote(),
906 0 : ),
907 0 : comment: Some(format!(
908 0 : "renaming database '{}' to '{}'",
909 0 : op.name, new_name
910 0 : )),
911 0 : }])
912 : } else {
913 0 : None
914 : }
915 : }
916 0 : _ => None,
917 : }
918 0 : })
919 0 : .flatten();
920 :
921 0 : Ok(Box::new(operations))
922 : }
923 : ApplySpecPhase::CreateAndAlterDatabases => {
924 0 : let mut ctx = ctx.write().await;
925 :
926 0 : let operations = spec
927 0 : .cluster
928 0 : .databases
929 0 : .iter()
930 0 : .filter_map(move |db| {
931 0 : let databases = &mut ctx.dbs;
932 0 : if let Some(edb) = databases.get_mut(&db.name) {
933 0 : let change_owner = if edb.owner.starts_with('"') {
934 0 : db.owner.pg_quote() != edb.owner
935 : } else {
936 0 : db.owner != edb.owner
937 : };
938 :
939 0 : edb.owner = db.owner.clone();
940 :
941 0 : if change_owner {
942 0 : Some(vec![Operation {
943 0 : query: format!(
944 0 : "ALTER DATABASE {} OWNER TO {}",
945 0 : db.name.pg_quote(),
946 0 : db.owner.pg_quote()
947 0 : ),
948 0 : comment: Some(format!(
949 0 : "changing database owner of database {} to {}",
950 0 : db.name, db.owner
951 0 : )),
952 0 : }])
953 : } else {
954 0 : None
955 : }
956 : } else {
957 0 : databases.insert(db.name.clone(), db.clone());
958 :
959 0 : Some(vec![
960 0 : Operation {
961 0 : query: format!(
962 0 : "CREATE DATABASE {} {}",
963 0 : db.name.pg_quote(),
964 0 : db.to_pg_options(),
965 0 : ),
966 0 : comment: None,
967 0 : },
968 0 : Operation {
969 0 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
970 0 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
971 0 : query: format!(
972 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO {}",
973 0 : db.name.pg_quote(),
974 0 : params.privileged_role_name
975 0 : ),
976 0 : comment: None,
977 0 : },
978 0 : ])
979 : }
980 0 : })
981 0 : .flatten();
982 :
983 0 : Ok(Box::new(operations))
984 : }
985 0 : ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
986 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
987 0 : comment: Some(String::from(
988 0 : "create schema for neon extension and utils tables",
989 0 : )),
990 0 : }))),
991 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
992 : // Do some checks that user DB exists and we can access it.
993 : //
994 : // During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
995 : // which happen before dropping the DB, the current run could be a retry,
996 : // so it's a valid case when DB is absent already. The case of
997 : // `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
998 : // in theory user can have some dangling objects there, so we will fail at
999 : // the actual drop later. Yet, to fix that in the current code we would need
1000 : // to ALTER DATABASE, and then check back, but that even more invasive, so
1001 : // that's not what we really want to do here.
1002 : //
1003 : // For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
1004 0 : if let DB::UserDB(db) = db {
1005 0 : let databases = &ctx.read().await.dbs;
1006 :
1007 0 : let edb = match databases.get(&db.name) {
1008 0 : Some(edb) => edb,
1009 : None => {
1010 0 : warn!(
1011 0 : "skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL",
1012 : subphase, db.name
1013 : );
1014 0 : return Ok(Box::new(empty()));
1015 : }
1016 : };
1017 :
1018 0 : if edb.restrict_conn || edb.invalid {
1019 0 : warn!(
1020 0 : "skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
1021 : subphase, db.name, edb.restrict_conn, edb.invalid
1022 : );
1023 0 : return Ok(Box::new(empty()));
1024 0 : }
1025 0 : }
1026 :
1027 0 : match subphase {
1028 : PerDatabasePhase::DropLogicalSubscriptions => {
1029 0 : match &db {
1030 0 : DB::UserDB(db) => {
1031 0 : let (db_name, outer_tag) = db.name.pg_quote_dollar();
1032 0 : let drop_subscription_query: String = format!(
1033 0 : include_str!("sql/drop_subscriptions.sql"),
1034 : datname_str = db_name,
1035 : outer_tag = outer_tag,
1036 : );
1037 :
1038 0 : let operations = vec![Operation {
1039 0 : query: drop_subscription_query,
1040 0 : comment: Some(format!(
1041 0 : "optionally dropping subscriptions for DB {}",
1042 0 : db.name,
1043 0 : )),
1044 0 : }]
1045 0 : .into_iter();
1046 :
1047 0 : Ok(Box::new(operations))
1048 : }
1049 : // skip this cleanup for the system databases
1050 : // because users can't drop them
1051 0 : DB::SystemDB => Ok(Box::new(empty())),
1052 : }
1053 : }
1054 : PerDatabasePhase::DeleteDBRoleReferences => {
1055 0 : let ctx = ctx.read().await;
1056 :
1057 0 : let operations = spec
1058 0 : .delta_operations
1059 0 : .iter()
1060 0 : .flatten()
1061 0 : .filter(|op| op.action == "delete_role")
1062 0 : .filter_map(move |op| {
1063 0 : if db.is_owned_by(&op.name) {
1064 0 : return None;
1065 0 : }
1066 0 : if !ctx.roles.contains_key(&op.name) {
1067 0 : return None;
1068 0 : }
1069 0 : let quoted = op.name.pg_quote();
1070 0 : let new_owner = match &db {
1071 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
1072 0 : DB::UserDB(db) => db.owner.pg_quote(),
1073 : };
1074 0 : let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
1075 :
1076 0 : Some(vec![
1077 0 : // This will reassign all dependent objects to the db owner
1078 0 : Operation {
1079 0 : query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
1080 0 : comment: None,
1081 0 : },
1082 0 : // Revoke some potentially blocking privileges (Neon-specific currently)
1083 0 : Operation {
1084 0 : query: format!(
1085 0 : include_str!("sql/pre_drop_role_revoke_privileges.sql"),
1086 0 : // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
1087 0 : role_name = escaped_role,
1088 0 : outer_tag = outer_tag,
1089 0 : )
1090 0 : // HADRON change:
1091 0 : .replace("neon_superuser", ¶ms.privileged_role_name),
1092 0 : // HADRON change end ,
1093 0 : comment: None,
1094 0 : },
1095 0 : // This now will only drop privileges of the role
1096 0 : // TODO: this is obviously not 100% true because of the above case,
1097 0 : // there could be still some privileges that are not revoked. Maybe this
1098 0 : // only drops privileges that were granted *by this* role, not *to this* role,
1099 0 : // but this has to be checked.
1100 0 : Operation {
1101 0 : query: format!("DROP OWNED BY {quoted}"),
1102 0 : comment: None,
1103 0 : },
1104 0 : ])
1105 0 : })
1106 0 : .flatten();
1107 :
1108 0 : Ok(Box::new(operations))
1109 : }
1110 : PerDatabasePhase::ChangeSchemaPerms => {
1111 0 : let db = match &db {
1112 : // ignore schema permissions on the system database
1113 0 : DB::SystemDB => return Ok(Box::new(empty())),
1114 0 : DB::UserDB(db) => db,
1115 : };
1116 0 : let (db_owner, outer_tag) = db.owner.pg_quote_dollar();
1117 :
1118 0 : let operations = vec![
1119 0 : Operation {
1120 0 : query: format!(
1121 0 : include_str!("sql/set_public_schema_owner.sql"),
1122 0 : db_owner = db_owner,
1123 0 : outer_tag = outer_tag,
1124 0 : ),
1125 0 : comment: None,
1126 0 : },
1127 0 : Operation {
1128 0 : query: String::from(include_str!("sql/default_grants.sql"))
1129 0 : .replace("neon_superuser", ¶ms.privileged_role_name),
1130 0 : comment: None,
1131 0 : },
1132 : ]
1133 0 : .into_iter();
1134 :
1135 0 : Ok(Box::new(operations))
1136 : }
1137 : }
1138 : }
1139 : // Interestingly, we only install p_s_s in the main database, even when
1140 : // it's preloaded.
1141 : ApplySpecPhase::HandleOtherExtensions => {
1142 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1143 0 : if libs.contains("pg_stat_statements") {
1144 0 : return Ok(Box::new(once(Operation {
1145 0 : query: String::from(
1146 0 : "CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
1147 0 : ),
1148 0 : comment: Some(String::from("create system extensions")),
1149 0 : })));
1150 0 : }
1151 0 : }
1152 0 : Ok(Box::new(empty()))
1153 : }
1154 0 : ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
1155 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
1156 0 : comment: Some(String::from("create pgaudit extensions")),
1157 0 : }))),
1158 0 : ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
1159 0 : query: String::from(
1160 0 : "CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
1161 0 : ),
1162 0 : comment: Some(String::from("create pgauditlogtofile extensions")),
1163 0 : }))),
1164 : // Disable pgaudit logging for postgres database.
1165 : // Postgres is neon system database used by monitors
1166 : // and compute_ctl tuning functions and thus generates a lot of noise.
1167 : // We do not consider data stored in this database as sensitive.
1168 : ApplySpecPhase::DisablePostgresDBPgAudit => {
1169 0 : let query = "ALTER DATABASE postgres SET pgaudit.log to 'none'";
1170 0 : Ok(Box::new(once(Operation {
1171 0 : query: query.to_string(),
1172 0 : comment: Some(query.to_string()),
1173 0 : })))
1174 : }
1175 : ApplySpecPhase::HandleNeonExtension => {
1176 0 : let operations = vec![
1177 0 : Operation {
1178 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
1179 0 : comment: Some(String::from(
1180 0 : "init: install the extension if not already installed",
1181 0 : )),
1182 0 : },
1183 0 : Operation {
1184 0 : query: String::from(
1185 0 : "UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
1186 0 : ),
1187 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
1188 0 : },
1189 0 : Operation {
1190 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
1191 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
1192 0 : },
1193 0 : Operation {
1194 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
1195 0 : comment: Some(String::from("compat/update: update neon extension version")),
1196 0 : },
1197 : ]
1198 0 : .into_iter();
1199 :
1200 0 : Ok(Box::new(operations))
1201 : }
1202 : // BEGIN_HADRON
1203 : // Note: we may want to version the extension someday, but for now we just drop it and recreate it.
1204 : ApplySpecPhase::HandleDatabricksAuthExtension => {
1205 0 : let operations = vec![
1206 0 : Operation {
1207 0 : query: String::from("DROP EXTENSION IF EXISTS databricks_auth"),
1208 0 : comment: Some(String::from("dropping existing databricks_auth extension")),
1209 0 : },
1210 0 : Operation {
1211 0 : query: String::from("CREATE EXTENSION databricks_auth"),
1212 0 : comment: Some(String::from("creating databricks_auth extension")),
1213 0 : },
1214 0 : Operation {
1215 0 : query: String::from("GRANT SELECT ON databricks_auth_metrics TO pg_monitor"),
1216 0 : comment: Some(String::from("grant select on databricks auth counters")),
1217 0 : },
1218 : ]
1219 0 : .into_iter();
1220 :
1221 0 : Ok(Box::new(operations))
1222 : }
1223 : // END_HADRON
1224 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
1225 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
1226 0 : comment: None,
1227 0 : }))),
1228 : ApplySpecPhase::DropRoles => {
1229 0 : let operations = spec
1230 0 : .delta_operations
1231 0 : .iter()
1232 0 : .flatten()
1233 0 : .filter(|op| op.action == "delete_role")
1234 0 : .map(|op| Operation {
1235 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
1236 0 : comment: None,
1237 0 : });
1238 :
1239 0 : Ok(Box::new(operations))
1240 : }
1241 :
1242 : // BEGIN_HADRON
1243 : // New Hadron phases
1244 : //
1245 : // Grants permissions to roles that are used by Databricks.
1246 : ApplySpecPhase::AddDatabricksGrants => {
1247 0 : let operations = vec![
1248 0 : Operation {
1249 0 : query: String::from("GRANT USAGE ON SCHEMA neon TO databricks_monitor"),
1250 0 : comment: Some(String::from(
1251 0 : "Permissions needed to execute neon.* functions (in the postgres database)",
1252 0 : )),
1253 0 : },
1254 0 : Operation {
1255 0 : query: String::from(
1256 0 : "GRANT SELECT, INSERT, UPDATE ON health_check TO databricks_monitor",
1257 0 : ),
1258 0 : comment: Some(String::from("Permissions needed for read and write probes")),
1259 0 : },
1260 0 : Operation {
1261 0 : query: String::from(
1262 0 : "GRANT EXECUTE ON FUNCTION pg_ls_dir(text) TO databricks_monitor",
1263 0 : ),
1264 0 : comment: Some(String::from(
1265 0 : "Permissions needed to monitor .snap file counts",
1266 0 : )),
1267 0 : },
1268 0 : Operation {
1269 0 : query: String::from(
1270 0 : "GRANT SELECT ON neon.neon_perf_counters TO databricks_monitor",
1271 0 : ),
1272 0 : comment: Some(String::from(
1273 0 : "Permissions needed to access neon performance counters view",
1274 0 : )),
1275 0 : },
1276 0 : Operation {
1277 0 : query: String::from(
1278 0 : "GRANT EXECUTE ON FUNCTION neon.get_perf_counters() TO databricks_monitor",
1279 0 : ),
1280 0 : comment: Some(String::from(
1281 0 : "Permissions needed to execute the underlying performance counters function",
1282 0 : )),
1283 0 : },
1284 : ]
1285 0 : .into_iter();
1286 :
1287 0 : Ok(Box::new(operations))
1288 : }
1289 : // Creates minor objects that are used by Databricks.
1290 0 : ApplySpecPhase::CreateDatabricksMisc => Ok(Box::new(once(Operation {
1291 0 : query: String::from(include_str!("sql/create_databricks_misc.sql")),
1292 0 : comment: Some(String::from(
1293 0 : "The function databricks_monitor uses to convert exception to 0 or 1",
1294 0 : )),
1295 0 : }))),
1296 : // End of new Hadron phases
1297 : // END_HADRON
1298 0 : ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
1299 0 : query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
1300 0 : comment: None,
1301 0 : }))),
1302 : }
1303 0 : }
|