Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::{empty, once};
5 : use std::sync::Arc;
6 :
7 : use anyhow::{Context, Result};
8 : use compute_api::responses::ComputeStatus;
9 : use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role};
10 : use futures::future::join_all;
11 : use tokio::sync::RwLock;
12 : use tokio_postgres::Client;
13 : use tokio_postgres::error::SqlState;
14 : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
15 :
16 : use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState};
17 : use crate::pg_helpers::{
18 : DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
19 : get_existing_roles_async,
20 : };
21 : use crate::spec_apply::ApplySpecPhase::{
22 : CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
23 : CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
24 : DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
25 : HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
26 : RunInEachDatabase,
27 : };
28 : use crate::spec_apply::PerDatabasePhase::{
29 : ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions,
30 : };
31 :
32 : impl ComputeNode {
33 : /// Apply the spec to the running PostgreSQL instance.
34 : /// The caller can decide to run with multiple clients in parallel, or
35 : /// single mode. Either way, the commands executed will be the same, and
36 : /// only commands run in different databases are parallelized.
37 : #[instrument(skip_all)]
38 : pub fn apply_spec_sql(
39 : &self,
40 : spec: Arc<ComputeSpec>,
41 : conf: Arc<tokio_postgres::Config>,
42 : concurrency: usize,
43 : ) -> Result<()> {
44 : info!("Applying config with max {} concurrency", concurrency);
45 : debug!("Config: {:?}", spec);
46 :
47 : let rt = tokio::runtime::Handle::current();
48 0 : rt.block_on(async {
49 : // Proceed with post-startup configuration. Note, that order of operations is important.
50 0 : let client = Self::get_maintenance_client(&conf).await?;
51 0 : let spec = spec.clone();
52 0 : let params = Arc::new(self.params.clone());
53 :
54 0 : let databases = get_existing_dbs_async(&client).await?;
55 0 : let roles = get_existing_roles_async(&client)
56 0 : .await?
57 0 : .into_iter()
58 0 : .map(|role| (role.name.clone(), role))
59 0 : .collect::<HashMap<String, Role>>();
60 :
61 : // Check if we need to drop subscriptions before starting the endpoint.
62 : //
63 : // It is important to do this operation exactly once when endpoint starts on a new branch.
64 : // Otherwise, we may drop not inherited, but newly created subscriptions.
65 : //
66 : // We cannot rely only on spec.drop_subscriptions_before_start flag,
67 : // because if for some reason compute restarts inside VM,
68 : // it will start again with the same spec and flag value.
69 : //
70 : // To handle this, we save the fact of the operation in the database
71 : // in the neon.drop_subscriptions_done table.
72 : // If the table does not exist, we assume that the operation was never performed, so we must do it.
73 : // If table exists, we check if the operation was performed on the current timelilne.
74 : //
75 0 : let mut drop_subscriptions_done = false;
76 :
77 0 : if spec.drop_subscriptions_before_start {
78 0 : let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
79 :
80 0 : info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
81 :
82 : drop_subscriptions_done = match
83 0 : client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
84 0 : Ok(result) => !result.is_empty(),
85 0 : Err(e) =>
86 : {
87 0 : match e.code() {
88 0 : Some(&SqlState::UNDEFINED_TABLE) => false,
89 : _ => {
90 : // We don't expect any other error here, except for the schema/table not existing
91 0 : error!("Error checking if drop subscription operation was already performed: {}", e);
92 0 : return Err(e.into());
93 : }
94 : }
95 : }
96 : }
97 0 : };
98 :
99 :
100 0 : let jwks_roles = Arc::new(
101 0 : spec.as_ref()
102 0 : .local_proxy_config
103 0 : .iter()
104 0 : .flat_map(|it| &it.jwks)
105 0 : .flatten()
106 0 : .flat_map(|setting| &setting.role_names)
107 0 : .cloned()
108 0 : .collect::<HashSet<_>>(),
109 : );
110 :
111 0 : let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
112 0 : roles,
113 0 : dbs: databases,
114 0 : }));
115 :
116 : // Apply special pre drop database phase.
117 : // NOTE: we use the code of RunInEachDatabase phase for parallelism
118 : // and connection management, but we don't really run it in *each* database,
119 : // only in databases, we're about to drop.
120 0 : info!("Applying PerDatabase (pre-dropdb) phase");
121 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
122 :
123 : // Run the phase for each database that we're about to drop.
124 0 : let db_processes = spec
125 0 : .delta_operations
126 0 : .iter()
127 0 : .flatten()
128 0 : .filter_map(move |op| {
129 0 : if op.action.as_str() == "delete_db" {
130 0 : Some(op.name.clone())
131 : } else {
132 0 : None
133 : }
134 0 : })
135 0 : .map(|dbname| {
136 0 : let spec = spec.clone();
137 0 : let ctx = ctx.clone();
138 0 : let jwks_roles = jwks_roles.clone();
139 0 : let mut conf = conf.as_ref().clone();
140 0 : let concurrency_token = concurrency_token.clone();
141 : // We only need dbname field for this phase, so set other fields to dummy values
142 0 : let db = DB::UserDB(Database {
143 0 : name: dbname.clone(),
144 0 : owner: "cloud_admin".to_string(),
145 0 : options: None,
146 0 : restrict_conn: false,
147 0 : invalid: false,
148 0 : });
149 :
150 0 : debug!("Applying per-database phases for Database {:?}", &db);
151 :
152 0 : match &db {
153 0 : DB::SystemDB => {}
154 0 : DB::UserDB(db) => {
155 0 : conf.dbname(db.name.as_str());
156 0 : }
157 : }
158 :
159 0 : let conf = Arc::new(conf);
160 0 : let fut = Self::apply_spec_sql_db(
161 0 : params.clone(),
162 0 : spec.clone(),
163 0 : conf,
164 0 : ctx.clone(),
165 0 : jwks_roles.clone(),
166 0 : concurrency_token.clone(),
167 0 : db,
168 0 : [DropLogicalSubscriptions].to_vec(),
169 : );
170 :
171 0 : Ok(tokio::spawn(fut))
172 0 : })
173 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
174 :
175 0 : for process in db_processes.into_iter() {
176 0 : let handle = process?;
177 0 : if let Err(e) = handle.await? {
178 : // Handle the error case where the database does not exist
179 : // We do not check whether the DB exists or not in the deletion phase,
180 : // so we shouldn't be strict about it in pre-deletion cleanup as well.
181 0 : if e.to_string().contains("does not exist") {
182 0 : warn!("Error dropping subscription: {}", e);
183 : } else {
184 0 : return Err(e);
185 : }
186 0 : };
187 : }
188 :
189 0 : for phase in [
190 0 : CreatePrivilegedRole,
191 0 : DropInvalidDatabases,
192 0 : RenameRoles,
193 0 : CreateAndAlterRoles,
194 0 : RenameAndDeleteDatabases,
195 0 : CreateAndAlterDatabases,
196 0 : CreateSchemaNeon,
197 : ] {
198 0 : info!("Applying phase {:?}", &phase);
199 0 : apply_operations(
200 0 : params.clone(),
201 0 : spec.clone(),
202 0 : ctx.clone(),
203 0 : jwks_roles.clone(),
204 0 : phase,
205 0 : || async { Ok(&client) },
206 : )
207 0 : .await?;
208 : }
209 :
210 0 : info!("Applying RunInEachDatabase2 phase");
211 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
212 :
213 0 : let db_processes = spec
214 0 : .cluster
215 0 : .databases
216 0 : .iter()
217 0 : .map(|db| DB::new(db.clone()))
218 : // include
219 0 : .chain(once(DB::SystemDB))
220 0 : .map(|db| {
221 0 : let spec = spec.clone();
222 0 : let ctx = ctx.clone();
223 0 : let jwks_roles = jwks_roles.clone();
224 0 : let mut conf = conf.as_ref().clone();
225 0 : let concurrency_token = concurrency_token.clone();
226 0 : let db = db.clone();
227 :
228 0 : debug!("Applying per-database phases for Database {:?}", &db);
229 :
230 0 : match &db {
231 0 : DB::SystemDB => {}
232 0 : DB::UserDB(db) => {
233 0 : conf.dbname(db.name.as_str());
234 0 : }
235 : }
236 :
237 0 : let conf = Arc::new(conf);
238 0 : let mut phases = vec![
239 0 : DeleteDBRoleReferences,
240 0 : ChangeSchemaPerms,
241 : ];
242 :
243 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
244 0 : info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
245 0 : phases.push(DropLogicalSubscriptions);
246 0 : }
247 :
248 0 : let fut = Self::apply_spec_sql_db(
249 0 : params.clone(),
250 0 : spec.clone(),
251 0 : conf,
252 0 : ctx.clone(),
253 0 : jwks_roles.clone(),
254 0 : concurrency_token.clone(),
255 0 : db,
256 0 : phases,
257 : );
258 :
259 0 : Ok(tokio::spawn(fut))
260 0 : })
261 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
262 :
263 0 : for process in db_processes.into_iter() {
264 0 : let handle = process?;
265 0 : handle.await??;
266 : }
267 :
268 0 : let mut phases = vec![
269 0 : HandleOtherExtensions,
270 0 : HandleNeonExtension, // This step depends on CreateSchemaNeon
271 0 : CreateAvailabilityCheck,
272 0 : DropRoles,
273 : ];
274 :
275 : // This step depends on CreateSchemaNeon
276 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
277 0 : info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
278 0 : phases.push(FinalizeDropLogicalSubscriptions);
279 0 : }
280 :
281 : // Keep DisablePostgresDBPgAudit phase at the end,
282 : // so that all config operations are audit logged.
283 0 : match spec.audit_log_level
284 : {
285 0 : ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
286 0 : phases.push(CreatePgauditExtension);
287 0 : phases.push(CreatePgauditlogtofileExtension);
288 0 : phases.push(DisablePostgresDBPgAudit);
289 0 : }
290 0 : ComputeAudit::Log | ComputeAudit::Base => {
291 0 : phases.push(CreatePgauditExtension);
292 0 : phases.push(DisablePostgresDBPgAudit);
293 0 : }
294 0 : ComputeAudit::Disabled => {}
295 : }
296 :
297 0 : for phase in phases {
298 0 : debug!("Applying phase {:?}", &phase);
299 0 : apply_operations(
300 0 : params.clone(),
301 0 : spec.clone(),
302 0 : ctx.clone(),
303 0 : jwks_roles.clone(),
304 0 : phase,
305 0 : || async { Ok(&client) },
306 : )
307 0 : .await?;
308 : }
309 :
310 0 : Ok::<(), anyhow::Error>(())
311 0 : })?;
312 :
313 : Ok(())
314 : }
315 :
316 : /// Apply SQL migrations of the RunInEachDatabase phase.
317 : ///
318 : /// May opt to not connect to databases that don't have any scheduled
319 : /// operations. The function is concurrency-controlled with the provided
320 : /// semaphore. The caller has to make sure the semaphore isn't exhausted.
321 : #[allow(clippy::too_many_arguments)] // TODO: needs bigger refactoring
322 0 : async fn apply_spec_sql_db(
323 0 : params: Arc<ComputeNodeParams>,
324 0 : spec: Arc<ComputeSpec>,
325 0 : conf: Arc<tokio_postgres::Config>,
326 0 : ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
327 0 : jwks_roles: Arc<HashSet<String>>,
328 0 : concurrency_token: Arc<tokio::sync::Semaphore>,
329 0 : db: DB,
330 0 : subphases: Vec<PerDatabasePhase>,
331 0 : ) -> Result<()> {
332 0 : let _permit = concurrency_token.acquire().await?;
333 :
334 0 : let mut client_conn = None;
335 :
336 0 : for subphase in subphases {
337 0 : apply_operations(
338 0 : params.clone(),
339 0 : spec.clone(),
340 0 : ctx.clone(),
341 0 : jwks_roles.clone(),
342 0 : RunInEachDatabase {
343 0 : db: db.clone(),
344 0 : subphase,
345 0 : },
346 : // Only connect if apply_operation actually wants a connection.
347 : // It's quite possible this database doesn't need any queries,
348 : // so by not connecting we save time and effort connecting to
349 : // that database.
350 0 : || async {
351 0 : if client_conn.is_none() {
352 0 : let db_client = Self::get_maintenance_client(&conf).await?;
353 0 : client_conn.replace(db_client);
354 0 : }
355 0 : let client = client_conn.as_ref().unwrap();
356 0 : Ok(client)
357 0 : },
358 : )
359 0 : .await?;
360 : }
361 :
362 0 : drop(client_conn);
363 :
364 0 : Ok::<(), anyhow::Error>(())
365 0 : }
366 :
367 : /// Choose how many concurrent connections to use for applying the spec changes.
368 0 : pub fn max_service_connections(
369 0 : &self,
370 0 : compute_state: &ComputeState,
371 0 : spec: &ComputeSpec,
372 0 : ) -> usize {
373 : // If the cluster is in Init state we don't have to deal with user connections,
374 : // and can thus use all `max_connections` connection slots. However, that's generally not
375 : // very efficient, so we generally still limit it to a smaller number.
376 0 : if compute_state.status == ComputeStatus::Init {
377 : // If the settings contain 'max_connections', use that as template
378 0 : if let Some(config) = spec.cluster.settings.find("max_connections") {
379 0 : config.parse::<usize>().ok()
380 : } else {
381 : // Otherwise, try to find the setting in the postgresql_conf string
382 0 : spec.cluster
383 0 : .postgresql_conf
384 0 : .iter()
385 0 : .flat_map(|conf| conf.split("\n"))
386 0 : .filter_map(|line| {
387 0 : if !line.contains("max_connections") {
388 0 : return None;
389 0 : }
390 :
391 0 : let (key, value) = line.split_once("=")?;
392 0 : let key = key
393 0 : .trim_start_matches(char::is_whitespace)
394 0 : .trim_end_matches(char::is_whitespace);
395 :
396 0 : let value = value
397 0 : .trim_start_matches(char::is_whitespace)
398 0 : .trim_end_matches(char::is_whitespace);
399 :
400 0 : if key != "max_connections" {
401 0 : return None;
402 0 : }
403 :
404 0 : value.parse::<usize>().ok()
405 0 : })
406 0 : .next()
407 : }
408 : // If max_connections is present, use at most 1/3rd of that.
409 : // When max_connections is lower than 30, try to use at least 10 connections, but
410 : // never more than max_connections.
411 0 : .map(|limit| match limit {
412 0 : 0..10 => limit,
413 0 : 10..30 => 10,
414 0 : 30..300 => limit / 3,
415 0 : 300.. => 100,
416 0 : })
417 : // If we didn't find max_connections, default to 10 concurrent connections.
418 0 : .unwrap_or(10)
419 : } else {
420 : // state == Running
421 : // Because the cluster is already in the Running state, we should assume users are
422 : // already connected to the cluster, and high concurrency could negatively
423 : // impact user connectivity. Therefore, we can limit concurrency to the number of
424 : // reserved superuser connections, which users wouldn't be able to use anyway.
425 0 : spec.cluster
426 0 : .settings
427 0 : .find("superuser_reserved_connections")
428 0 : .iter()
429 0 : .filter_map(|val| val.parse::<usize>().ok())
430 0 : .map(|val| if val > 1 { val - 1 } else { 1 })
431 0 : .next_back()
432 0 : .unwrap_or(3)
433 : }
434 0 : }
435 : }
436 :
437 : #[derive(Clone)]
438 : pub enum DB {
439 : SystemDB,
440 : UserDB(Database),
441 : }
442 :
443 : impl DB {
444 0 : pub fn new(db: Database) -> DB {
445 0 : Self::UserDB(db)
446 0 : }
447 :
448 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
449 0 : match self {
450 0 : DB::SystemDB => false,
451 0 : DB::UserDB(db) => &db.owner == role,
452 : }
453 0 : }
454 : }
455 :
456 : impl Debug for DB {
457 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
458 0 : match self {
459 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
460 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
461 : }
462 0 : }
463 : }
464 :
465 : #[derive(Copy, Clone, Debug)]
466 : pub enum PerDatabasePhase {
467 : DeleteDBRoleReferences,
468 : ChangeSchemaPerms,
469 : /// This is a shared phase, used for both i) dropping dangling LR subscriptions
470 : /// before dropping the DB, and ii) dropping all subscriptions after creating
471 : /// a fresh branch.
472 : /// N.B. we will skip all DBs that are not present in Postgres, invalid, or
473 : /// have `datallowconn = false` (`restrict_conn`).
474 : DropLogicalSubscriptions,
475 : }
476 :
477 : #[derive(Clone, Debug)]
478 : pub enum ApplySpecPhase {
479 : CreatePrivilegedRole,
480 : DropInvalidDatabases,
481 : RenameRoles,
482 : CreateAndAlterRoles,
483 : RenameAndDeleteDatabases,
484 : CreateAndAlterDatabases,
485 : CreateSchemaNeon,
486 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
487 : CreatePgauditExtension,
488 : CreatePgauditlogtofileExtension,
489 : DisablePostgresDBPgAudit,
490 : HandleOtherExtensions,
491 : HandleNeonExtension,
492 : CreateAvailabilityCheck,
493 : DropRoles,
494 : FinalizeDropLogicalSubscriptions,
495 : }
496 :
497 : pub struct Operation {
498 : pub query: String,
499 : pub comment: Option<String>,
500 : }
501 :
502 : pub struct MutableApplyContext {
503 : pub roles: HashMap<String, Role>,
504 : pub dbs: HashMap<String, Database>,
505 : }
506 :
507 : /// Apply the operations that belong to the given spec apply phase.
508 : ///
509 : /// Commands within a single phase are executed in order of Iterator yield.
510 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
511 : /// indicated by its `db` field, and can share a single client for all changes
512 : /// to that database.
513 : ///
514 : /// Notes:
515 : /// - Commands are pipelined, and thus may cause incomplete apply if one
516 : /// command of many fails.
517 : /// - Failing commands will fail the phase's apply step once the return value
518 : /// is processed.
519 : /// - No timeouts have (yet) been implemented.
520 : /// - The caller is responsible for limiting and/or applying concurrency.
521 0 : pub async fn apply_operations<'a, Fut, F>(
522 0 : params: Arc<ComputeNodeParams>,
523 0 : spec: Arc<ComputeSpec>,
524 0 : ctx: Arc<RwLock<MutableApplyContext>>,
525 0 : jwks_roles: Arc<HashSet<String>>,
526 0 : apply_spec_phase: ApplySpecPhase,
527 0 : client: F,
528 0 : ) -> Result<()>
529 0 : where
530 0 : F: FnOnce() -> Fut,
531 0 : Fut: Future<Output = Result<&'a Client>>,
532 0 : {
533 0 : debug!("Starting phase {:?}", &apply_spec_phase);
534 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
535 0 : let span2 = span.clone();
536 0 : async move {
537 0 : debug!("Processing phase {:?}", &apply_spec_phase);
538 0 : let ctx = ctx;
539 :
540 0 : let mut ops = get_operations(¶ms, &spec, &ctx, &jwks_roles, &apply_spec_phase)
541 0 : .await?
542 0 : .peekable();
543 :
544 : // Return (and by doing so, skip requesting the PostgreSQL client) if
545 : // we don't have any operations scheduled.
546 0 : if ops.peek().is_none() {
547 0 : return Ok(());
548 0 : }
549 :
550 0 : let client = client().await?;
551 :
552 0 : debug!("Applying phase {:?}", &apply_spec_phase);
553 :
554 0 : let active_queries = ops
555 0 : .map(|op| {
556 0 : let Operation { comment, query } = op;
557 0 : let inspan = match comment {
558 0 : None => span.clone(),
559 0 : Some(comment) => info_span!("phase {}: {}", comment),
560 : };
561 :
562 0 : async {
563 0 : let query = query;
564 0 : let res = client.simple_query(&query).await;
565 0 : debug!(
566 0 : "{} {}",
567 0 : if res.is_ok() {
568 0 : "successfully executed"
569 : } else {
570 0 : "failed to execute"
571 : },
572 : query
573 : );
574 0 : res
575 0 : }
576 0 : .instrument(inspan)
577 0 : })
578 0 : .collect::<Vec<_>>();
579 :
580 0 : drop(ctx);
581 :
582 0 : for it in join_all(active_queries).await {
583 0 : drop(it?);
584 : }
585 :
586 0 : debug!("Completed phase {:?}", &apply_spec_phase);
587 :
588 0 : Ok(())
589 0 : }
590 0 : .instrument(span2)
591 0 : .await
592 0 : }
593 :
594 : /// Create a stream of operations to be executed for that phase of applying
595 : /// changes.
596 : ///
597 : /// In the future we may generate a single stream of changes and then
598 : /// sort/merge/batch execution, but for now this is a nice way to improve
599 : /// batching behavior of the commands.
600 0 : async fn get_operations<'a>(
601 0 : params: &'a ComputeNodeParams,
602 0 : spec: &'a ComputeSpec,
603 0 : ctx: &'a RwLock<MutableApplyContext>,
604 0 : jwks_roles: &'a HashSet<String>,
605 0 : apply_spec_phase: &'a ApplySpecPhase,
606 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
607 0 : match apply_spec_phase {
608 0 : ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
609 0 : query: format!(
610 0 : include_str!("sql/create_privileged_role.sql"),
611 0 : privileged_role_name = params.privileged_role_name
612 0 : ),
613 0 : comment: None,
614 0 : }))),
615 : ApplySpecPhase::DropInvalidDatabases => {
616 0 : let mut ctx = ctx.write().await;
617 0 : let databases = &mut ctx.dbs;
618 :
619 0 : let keys: Vec<_> = databases
620 0 : .iter()
621 0 : .filter(|(_, db)| db.invalid)
622 0 : .map(|(dbname, _)| dbname.clone())
623 0 : .collect();
624 :
625 : // After recent commit in Postgres, interrupted DROP DATABASE
626 : // leaves the database in the invalid state. According to the
627 : // commit message, the only option for user is to drop it again.
628 : // See:
629 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
630 : //
631 : // Postgres Neon extension is done the way, that db is de-registered
632 : // in the control plane metadata only after it is dropped. So there is
633 : // a chance that it still thinks that the db should exist. This means
634 : // that it will be re-created by the `CreateDatabases` phase. This
635 : // is fine, as user can just drop the table again (in vanilla
636 : // Postgres they would need to do the same).
637 0 : let operations = keys
638 0 : .into_iter()
639 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
640 0 : .map(|db| Operation {
641 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
642 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
643 0 : });
644 :
645 0 : Ok(Box::new(operations))
646 : }
647 : ApplySpecPhase::RenameRoles => {
648 0 : let mut ctx = ctx.write().await;
649 :
650 0 : let operations = spec
651 0 : .delta_operations
652 0 : .iter()
653 0 : .flatten()
654 0 : .filter(|op| op.action == "rename_role")
655 0 : .filter_map(move |op| {
656 0 : let roles = &mut ctx.roles;
657 :
658 0 : if roles.contains_key(op.name.as_str()) {
659 0 : None
660 : } else {
661 0 : let new_name = op.new_name.as_ref().unwrap();
662 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
663 :
664 0 : role.name = new_name.clone();
665 0 : role.encrypted_password = None;
666 0 : roles.insert(role.name.clone(), role);
667 :
668 0 : Some(Operation {
669 0 : query: format!(
670 0 : "ALTER ROLE {} RENAME TO {}",
671 0 : op.name.pg_quote(),
672 0 : new_name.pg_quote()
673 0 : ),
674 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
675 0 : })
676 : }
677 0 : });
678 :
679 0 : Ok(Box::new(operations))
680 : }
681 : ApplySpecPhase::CreateAndAlterRoles => {
682 0 : let mut ctx = ctx.write().await;
683 :
684 0 : let operations = spec.cluster.roles
685 0 : .iter()
686 0 : .filter_map(move |role| {
687 0 : let roles = &mut ctx.roles;
688 0 : let db_role = roles.get(&role.name);
689 :
690 0 : match db_role {
691 0 : Some(db_role) => {
692 0 : if db_role.encrypted_password != role.encrypted_password {
693 : // This can be run on /every/ role! Not just ones created through the console.
694 : // This means that if you add some funny ALTER here that adds a permission,
695 : // this will get run even on user-created roles! This will result in different
696 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
697 : // now only grants LOGIN and changes the password. Please do not allow this branch
698 : // to do anything silly.
699 0 : Some(Operation {
700 0 : query: format!(
701 0 : "ALTER ROLE {} {}",
702 0 : role.name.pg_quote(),
703 0 : role.to_pg_options(),
704 0 : ),
705 0 : comment: None,
706 0 : })
707 : } else {
708 0 : None
709 : }
710 : }
711 : None => {
712 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
713 0 : format!(
714 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE {} {}",
715 0 : role.name.pg_quote(),
716 : params.privileged_role_name,
717 0 : role.to_pg_options(),
718 : )
719 : } else {
720 0 : format!(
721 0 : "CREATE ROLE {} {}",
722 0 : role.name.pg_quote(),
723 0 : role.to_pg_options(),
724 : )
725 : };
726 0 : Some(Operation {
727 0 : query,
728 0 : comment: Some(format!("creating role {}", role.name)),
729 0 : })
730 : }
731 : }
732 0 : });
733 :
734 0 : Ok(Box::new(operations))
735 : }
736 : ApplySpecPhase::RenameAndDeleteDatabases => {
737 0 : let mut ctx = ctx.write().await;
738 :
739 0 : let operations = spec
740 0 : .delta_operations
741 0 : .iter()
742 0 : .flatten()
743 0 : .filter_map(move |op| {
744 0 : let databases = &mut ctx.dbs;
745 0 : match op.action.as_str() {
746 : // We do not check whether the DB exists or not,
747 : // Postgres will take care of it for us
748 0 : "delete_db" => {
749 0 : let (db_name, outer_tag) = op.name.pg_quote_dollar();
750 : // In Postgres we can't drop a database if it is a template.
751 : // So we need to unset the template flag first, but it could
752 : // be a retry, so we could've already dropped the database.
753 : // Check that database exists first to make it idempotent.
754 0 : let unset_template_query: String = format!(
755 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
756 : datname = db_name,
757 : outer_tag = outer_tag,
758 : );
759 :
760 : // Use FORCE to drop database even if there are active connections.
761 : // We run this from `cloud_admin`, so it should have enough privileges.
762 : //
763 : // NB: there could be other db states, which prevent us from dropping
764 : // the database. For example, if db is used by any active subscription
765 : // or replication slot.
766 : // Such cases are handled in the DropLogicalSubscriptions
767 : // phase. We do all the cleanup before actually dropping the database.
768 0 : let drop_db_query: String = format!(
769 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
770 0 : &op.name.pg_quote()
771 : );
772 :
773 0 : databases.remove(&op.name);
774 :
775 0 : Some(vec![
776 0 : Operation {
777 0 : query: unset_template_query,
778 0 : comment: Some(format!(
779 0 : "optionally clearing template flags for DB {}",
780 0 : op.name,
781 0 : )),
782 0 : },
783 0 : Operation {
784 0 : query: drop_db_query,
785 0 : comment: Some(format!("deleting database {}", op.name,)),
786 0 : },
787 0 : ])
788 : }
789 0 : "rename_db" => {
790 0 : if let Some(mut db) = databases.remove(&op.name) {
791 : // update state of known databases
792 0 : let new_name = op.new_name.as_ref().unwrap();
793 0 : db.name = new_name.clone();
794 0 : databases.insert(db.name.clone(), db);
795 :
796 0 : Some(vec![Operation {
797 0 : query: format!(
798 0 : "ALTER DATABASE {} RENAME TO {}",
799 0 : op.name.pg_quote(),
800 0 : new_name.pg_quote(),
801 0 : ),
802 0 : comment: Some(format!(
803 0 : "renaming database '{}' to '{}'",
804 0 : op.name, new_name
805 0 : )),
806 0 : }])
807 : } else {
808 0 : None
809 : }
810 : }
811 0 : _ => None,
812 : }
813 0 : })
814 0 : .flatten();
815 :
816 0 : Ok(Box::new(operations))
817 : }
818 : ApplySpecPhase::CreateAndAlterDatabases => {
819 0 : let mut ctx = ctx.write().await;
820 :
821 0 : let operations = spec
822 0 : .cluster
823 0 : .databases
824 0 : .iter()
825 0 : .filter_map(move |db| {
826 0 : let databases = &mut ctx.dbs;
827 0 : if let Some(edb) = databases.get_mut(&db.name) {
828 0 : let change_owner = if edb.owner.starts_with('"') {
829 0 : db.owner.pg_quote() != edb.owner
830 : } else {
831 0 : db.owner != edb.owner
832 : };
833 :
834 0 : edb.owner = db.owner.clone();
835 :
836 0 : if change_owner {
837 0 : Some(vec![Operation {
838 0 : query: format!(
839 0 : "ALTER DATABASE {} OWNER TO {}",
840 0 : db.name.pg_quote(),
841 0 : db.owner.pg_quote()
842 0 : ),
843 0 : comment: Some(format!(
844 0 : "changing database owner of database {} to {}",
845 0 : db.name, db.owner
846 0 : )),
847 0 : }])
848 : } else {
849 0 : None
850 : }
851 : } else {
852 0 : databases.insert(db.name.clone(), db.clone());
853 :
854 0 : Some(vec![
855 0 : Operation {
856 0 : query: format!(
857 0 : "CREATE DATABASE {} {}",
858 0 : db.name.pg_quote(),
859 0 : db.to_pg_options(),
860 0 : ),
861 0 : comment: None,
862 0 : },
863 0 : Operation {
864 0 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
865 0 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
866 0 : query: format!(
867 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO {}",
868 0 : db.name.pg_quote(),
869 0 : params.privileged_role_name
870 0 : ),
871 0 : comment: None,
872 0 : },
873 0 : ])
874 : }
875 0 : })
876 0 : .flatten();
877 :
878 0 : Ok(Box::new(operations))
879 : }
880 0 : ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
881 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
882 0 : comment: Some(String::from(
883 0 : "create schema for neon extension and utils tables",
884 0 : )),
885 0 : }))),
886 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
887 : // Do some checks that user DB exists and we can access it.
888 : //
889 : // During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
890 : // which happen before dropping the DB, the current run could be a retry,
891 : // so it's a valid case when DB is absent already. The case of
892 : // `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
893 : // in theory user can have some dangling objects there, so we will fail at
894 : // the actual drop later. Yet, to fix that in the current code we would need
895 : // to ALTER DATABASE, and then check back, but that even more invasive, so
896 : // that's not what we really want to do here.
897 : //
898 : // For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
899 0 : if let DB::UserDB(db) = db {
900 0 : let databases = &ctx.read().await.dbs;
901 :
902 0 : let edb = match databases.get(&db.name) {
903 0 : Some(edb) => edb,
904 : None => {
905 0 : warn!(
906 0 : "skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL",
907 : subphase, db.name
908 : );
909 0 : return Ok(Box::new(empty()));
910 : }
911 : };
912 :
913 0 : if edb.restrict_conn || edb.invalid {
914 0 : warn!(
915 0 : "skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
916 : subphase, db.name, edb.restrict_conn, edb.invalid
917 : );
918 0 : return Ok(Box::new(empty()));
919 0 : }
920 0 : }
921 :
922 0 : match subphase {
923 : PerDatabasePhase::DropLogicalSubscriptions => {
924 0 : match &db {
925 0 : DB::UserDB(db) => {
926 0 : let (db_name, outer_tag) = db.name.pg_quote_dollar();
927 0 : let drop_subscription_query: String = format!(
928 0 : include_str!("sql/drop_subscriptions.sql"),
929 : datname_str = db_name,
930 : outer_tag = outer_tag,
931 : );
932 :
933 0 : let operations = vec![Operation {
934 0 : query: drop_subscription_query,
935 0 : comment: Some(format!(
936 0 : "optionally dropping subscriptions for DB {}",
937 0 : db.name,
938 0 : )),
939 0 : }]
940 0 : .into_iter();
941 :
942 0 : Ok(Box::new(operations))
943 : }
944 : // skip this cleanup for the system databases
945 : // because users can't drop them
946 0 : DB::SystemDB => Ok(Box::new(empty())),
947 : }
948 : }
949 : PerDatabasePhase::DeleteDBRoleReferences => {
950 0 : let ctx = ctx.read().await;
951 :
952 0 : let operations = spec
953 0 : .delta_operations
954 0 : .iter()
955 0 : .flatten()
956 0 : .filter(|op| op.action == "delete_role")
957 0 : .filter_map(move |op| {
958 0 : if db.is_owned_by(&op.name) {
959 0 : return None;
960 0 : }
961 0 : if !ctx.roles.contains_key(&op.name) {
962 0 : return None;
963 0 : }
964 0 : let quoted = op.name.pg_quote();
965 0 : let new_owner = match &db {
966 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
967 0 : DB::UserDB(db) => db.owner.pg_quote(),
968 : };
969 0 : let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
970 :
971 0 : Some(vec![
972 0 : // This will reassign all dependent objects to the db owner
973 0 : Operation {
974 0 : query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
975 0 : comment: None,
976 0 : },
977 0 : // Revoke some potentially blocking privileges (Neon-specific currently)
978 0 : Operation {
979 0 : query: format!(
980 0 : include_str!("sql/pre_drop_role_revoke_privileges.sql"),
981 0 : // N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
982 0 : role_name = escaped_role,
983 0 : outer_tag = outer_tag,
984 0 : ),
985 0 : comment: None,
986 0 : },
987 0 : // This now will only drop privileges of the role
988 0 : // TODO: this is obviously not 100% true because of the above case,
989 0 : // there could be still some privileges that are not revoked. Maybe this
990 0 : // only drops privileges that were granted *by this* role, not *to this* role,
991 0 : // but this has to be checked.
992 0 : Operation {
993 0 : query: format!("DROP OWNED BY {quoted}"),
994 0 : comment: None,
995 0 : },
996 0 : ])
997 0 : })
998 0 : .flatten();
999 :
1000 0 : Ok(Box::new(operations))
1001 : }
1002 : PerDatabasePhase::ChangeSchemaPerms => {
1003 0 : let db = match &db {
1004 : // ignore schema permissions on the system database
1005 0 : DB::SystemDB => return Ok(Box::new(empty())),
1006 0 : DB::UserDB(db) => db,
1007 : };
1008 0 : let (db_owner, outer_tag) = db.owner.pg_quote_dollar();
1009 :
1010 0 : let operations = vec![
1011 0 : Operation {
1012 0 : query: format!(
1013 0 : include_str!("sql/set_public_schema_owner.sql"),
1014 0 : db_owner = db_owner,
1015 0 : outer_tag = outer_tag,
1016 0 : ),
1017 0 : comment: None,
1018 0 : },
1019 0 : Operation {
1020 0 : query: String::from(include_str!("sql/default_grants.sql")),
1021 0 : comment: None,
1022 0 : },
1023 : ]
1024 0 : .into_iter();
1025 :
1026 0 : Ok(Box::new(operations))
1027 : }
1028 : }
1029 : }
1030 : // Interestingly, we only install p_s_s in the main database, even when
1031 : // it's preloaded.
1032 : ApplySpecPhase::HandleOtherExtensions => {
1033 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1034 0 : if libs.contains("pg_stat_statements") {
1035 0 : return Ok(Box::new(once(Operation {
1036 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
1037 0 : comment: Some(String::from("create system extensions")),
1038 0 : })));
1039 0 : }
1040 0 : }
1041 0 : Ok(Box::new(empty()))
1042 : }
1043 0 : ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
1044 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
1045 0 : comment: Some(String::from("create pgaudit extensions")),
1046 0 : }))),
1047 0 : ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
1048 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
1049 0 : comment: Some(String::from("create pgauditlogtofile extensions")),
1050 0 : }))),
1051 : // Disable pgaudit logging for postgres database.
1052 : // Postgres is neon system database used by monitors
1053 : // and compute_ctl tuning functions and thus generates a lot of noise.
1054 : // We do not consider data stored in this database as sensitive.
1055 : ApplySpecPhase::DisablePostgresDBPgAudit => {
1056 0 : let query = "ALTER DATABASE postgres SET pgaudit.log to 'none'";
1057 0 : Ok(Box::new(once(Operation {
1058 0 : query: query.to_string(),
1059 0 : comment: Some(query.to_string()),
1060 0 : })))
1061 : }
1062 : ApplySpecPhase::HandleNeonExtension => {
1063 0 : let operations = vec![
1064 0 : Operation {
1065 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
1066 0 : comment: Some(String::from(
1067 0 : "init: install the extension if not already installed",
1068 0 : )),
1069 0 : },
1070 0 : Operation {
1071 0 : query: String::from(
1072 0 : "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
1073 0 : ),
1074 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
1075 0 : },
1076 0 : Operation {
1077 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
1078 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
1079 0 : },
1080 0 : Operation {
1081 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
1082 0 : comment: Some(String::from("compat/update: update neon extension version")),
1083 0 : },
1084 : ]
1085 0 : .into_iter();
1086 :
1087 0 : Ok(Box::new(operations))
1088 : }
1089 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
1090 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
1091 0 : comment: None,
1092 0 : }))),
1093 : ApplySpecPhase::DropRoles => {
1094 0 : let operations = spec
1095 0 : .delta_operations
1096 0 : .iter()
1097 0 : .flatten()
1098 0 : .filter(|op| op.action == "delete_role")
1099 0 : .map(|op| Operation {
1100 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
1101 0 : comment: None,
1102 0 : });
1103 :
1104 0 : Ok(Box::new(operations))
1105 : }
1106 0 : ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
1107 0 : query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
1108 0 : comment: None,
1109 0 : }))),
1110 : }
1111 0 : }
|