Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::empty;
5 : use std::iter::once;
6 : use std::sync::Arc;
7 :
8 : use crate::compute::construct_superuser_query;
9 : use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
10 : use anyhow::{bail, Result};
11 : use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
12 : use futures::future::join_all;
13 : use tokio::sync::RwLock;
14 : use tokio_postgres::Client;
15 : use tracing::{debug, info_span, Instrument};
16 :
17 : #[derive(Clone)]
18 : pub enum DB {
19 : SystemDB,
20 : UserDB(Database),
21 : }
22 :
23 : impl DB {
24 0 : pub fn new(db: Database) -> DB {
25 0 : Self::UserDB(db)
26 0 : }
27 :
28 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
29 0 : match self {
30 0 : DB::SystemDB => false,
31 0 : DB::UserDB(db) => &db.owner == role,
32 : }
33 0 : }
34 : }
35 :
36 : impl Debug for DB {
37 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 0 : match self {
39 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
40 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
41 : }
42 0 : }
43 : }
44 :
45 : #[derive(Copy, Clone, Debug)]
46 : pub enum PerDatabasePhase {
47 : DeleteDBRoleReferences,
48 : ChangeSchemaPerms,
49 : HandleAnonExtension,
50 : DropLogicalSubscriptions,
51 : }
52 :
53 : #[derive(Clone, Debug)]
54 : pub enum ApplySpecPhase {
55 : CreateSuperUser,
56 : DropInvalidDatabases,
57 : RenameRoles,
58 : CreateAndAlterRoles,
59 : RenameAndDeleteDatabases,
60 : CreateAndAlterDatabases,
61 : CreateSchemaNeon,
62 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
63 : HandleOtherExtensions,
64 : HandleNeonExtension,
65 : CreateAvailabilityCheck,
66 : DropRoles,
67 : FinalizeDropLogicalSubscriptions,
68 : }
69 :
70 : pub struct Operation {
71 : pub query: String,
72 : pub comment: Option<String>,
73 : }
74 :
75 : pub struct MutableApplyContext {
76 : pub roles: HashMap<String, Role>,
77 : pub dbs: HashMap<String, Database>,
78 : }
79 :
80 : /// Apply the operations that belong to the given spec apply phase.
81 : ///
82 : /// Commands within a single phase are executed in order of Iterator yield.
83 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
84 : /// indicated by its `db` field, and can share a single client for all changes
85 : /// to that database.
86 : ///
87 : /// Notes:
88 : /// - Commands are pipelined, and thus may cause incomplete apply if one
89 : /// command of many fails.
90 : /// - Failing commands will fail the phase's apply step once the return value
91 : /// is processed.
92 : /// - No timeouts have (yet) been implemented.
93 : /// - The caller is responsible for limiting and/or applying concurrency.
94 0 : pub async fn apply_operations<'a, Fut, F>(
95 0 : spec: Arc<ComputeSpec>,
96 0 : ctx: Arc<RwLock<MutableApplyContext>>,
97 0 : jwks_roles: Arc<HashSet<String>>,
98 0 : apply_spec_phase: ApplySpecPhase,
99 0 : client: F,
100 0 : ) -> Result<()>
101 0 : where
102 0 : F: FnOnce() -> Fut,
103 0 : Fut: Future<Output = Result<&'a Client>>,
104 0 : {
105 0 : debug!("Starting phase {:?}", &apply_spec_phase);
106 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
107 0 : let span2 = span.clone();
108 0 : async move {
109 0 : debug!("Processing phase {:?}", &apply_spec_phase);
110 0 : let ctx = ctx;
111 :
112 0 : let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
113 0 : .await?
114 0 : .peekable();
115 0 :
116 0 : // Return (and by doing so, skip requesting the PostgreSQL client) if
117 0 : // we don't have any operations scheduled.
118 0 : if ops.peek().is_none() {
119 0 : return Ok(());
120 0 : }
121 :
122 0 : let client = client().await?;
123 :
124 0 : debug!("Applying phase {:?}", &apply_spec_phase);
125 :
126 0 : let active_queries = ops
127 0 : .map(|op| {
128 0 : let Operation { comment, query } = op;
129 0 : let inspan = match comment {
130 0 : None => span.clone(),
131 0 : Some(comment) => info_span!("phase {}: {}", comment),
132 : };
133 :
134 0 : async {
135 0 : let query = query;
136 0 : let res = client.simple_query(&query).await;
137 0 : debug!(
138 0 : "{} {}",
139 0 : if res.is_ok() {
140 0 : "successfully executed"
141 : } else {
142 0 : "failed to execute"
143 : },
144 : query
145 : );
146 0 : res
147 0 : }
148 0 : .instrument(inspan)
149 0 : })
150 0 : .collect::<Vec<_>>();
151 0 :
152 0 : drop(ctx);
153 :
154 0 : for it in join_all(active_queries).await {
155 0 : drop(it?);
156 : }
157 :
158 0 : debug!("Completed phase {:?}", &apply_spec_phase);
159 :
160 0 : Ok(())
161 0 : }
162 0 : .instrument(span2)
163 0 : .await
164 0 : }
165 :
166 : /// Create a stream of operations to be executed for that phase of applying
167 : /// changes.
168 : ///
169 : /// In the future we may generate a single stream of changes and then
170 : /// sort/merge/batch execution, but for now this is a nice way to improve
171 : /// batching behaviour of the commands.
172 0 : async fn get_operations<'a>(
173 0 : spec: &'a ComputeSpec,
174 0 : ctx: &'a RwLock<MutableApplyContext>,
175 0 : jwks_roles: &'a HashSet<String>,
176 0 : apply_spec_phase: &'a ApplySpecPhase,
177 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
178 0 : match apply_spec_phase {
179 : ApplySpecPhase::CreateSuperUser => {
180 0 : let query = construct_superuser_query(spec);
181 0 :
182 0 : Ok(Box::new(once(Operation {
183 0 : query,
184 0 : comment: None,
185 0 : })))
186 : }
187 : ApplySpecPhase::DropInvalidDatabases => {
188 0 : let mut ctx = ctx.write().await;
189 0 : let databases = &mut ctx.dbs;
190 0 :
191 0 : let keys: Vec<_> = databases
192 0 : .iter()
193 0 : .filter(|(_, db)| db.invalid)
194 0 : .map(|(dbname, _)| dbname.clone())
195 0 : .collect();
196 0 :
197 0 : // After recent commit in Postgres, interrupted DROP DATABASE
198 0 : // leaves the database in the invalid state. According to the
199 0 : // commit message, the only option for user is to drop it again.
200 0 : // See:
201 0 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
202 0 : //
203 0 : // Postgres Neon extension is done the way, that db is de-registered
204 0 : // in the control plane metadata only after it is dropped. So there is
205 0 : // a chance that it still thinks that the db should exist. This means
206 0 : // that it will be re-created by the `CreateDatabases` phase. This
207 0 : // is fine, as user can just drop the table again (in vanilla
208 0 : // Postgres they would need to do the same).
209 0 : let operations = keys
210 0 : .into_iter()
211 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
212 0 : .map(|db| Operation {
213 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
214 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
215 0 : });
216 0 :
217 0 : Ok(Box::new(operations))
218 : }
219 : ApplySpecPhase::RenameRoles => {
220 0 : let mut ctx = ctx.write().await;
221 :
222 0 : let operations = spec
223 0 : .delta_operations
224 0 : .iter()
225 0 : .flatten()
226 0 : .filter(|op| op.action == "rename_role")
227 0 : .filter_map(move |op| {
228 0 : let roles = &mut ctx.roles;
229 0 :
230 0 : if roles.contains_key(op.name.as_str()) {
231 0 : None
232 : } else {
233 0 : let new_name = op.new_name.as_ref().unwrap();
234 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
235 0 :
236 0 : role.name = new_name.clone();
237 0 : role.encrypted_password = None;
238 0 : roles.insert(role.name.clone(), role);
239 0 :
240 0 : Some(Operation {
241 0 : query: format!(
242 0 : "ALTER ROLE {} RENAME TO {}",
243 0 : op.name.pg_quote(),
244 0 : new_name.pg_quote()
245 0 : ),
246 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
247 0 : })
248 : }
249 0 : });
250 0 :
251 0 : Ok(Box::new(operations))
252 : }
253 : ApplySpecPhase::CreateAndAlterRoles => {
254 0 : let mut ctx = ctx.write().await;
255 :
256 0 : let operations = spec.cluster.roles
257 0 : .iter()
258 0 : .filter_map(move |role| {
259 0 : let roles = &mut ctx.roles;
260 0 : let db_role = roles.get(&role.name);
261 0 :
262 0 : match db_role {
263 0 : Some(db_role) => {
264 0 : if db_role.encrypted_password != role.encrypted_password {
265 : // This can be run on /every/ role! Not just ones created through the console.
266 : // This means that if you add some funny ALTER here that adds a permission,
267 : // this will get run even on user-created roles! This will result in different
268 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
269 : // now only grants LOGIN and changes the password. Please do not allow this branch
270 : // to do anything silly.
271 0 : Some(Operation {
272 0 : query: format!(
273 0 : "ALTER ROLE {} {}",
274 0 : role.name.pg_quote(),
275 0 : role.to_pg_options(),
276 0 : ),
277 0 : comment: None,
278 0 : })
279 : } else {
280 0 : None
281 : }
282 : }
283 : None => {
284 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
285 0 : format!(
286 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
287 0 : role.name.pg_quote(),
288 0 : role.to_pg_options(),
289 0 : )
290 : } else {
291 0 : format!(
292 0 : "CREATE ROLE {} {}",
293 0 : role.name.pg_quote(),
294 0 : role.to_pg_options(),
295 0 : )
296 : };
297 0 : Some(Operation {
298 0 : query,
299 0 : comment: Some(format!("creating role {}", role.name)),
300 0 : })
301 : }
302 : }
303 0 : });
304 0 :
305 0 : Ok(Box::new(operations))
306 : }
307 : ApplySpecPhase::RenameAndDeleteDatabases => {
308 0 : let mut ctx = ctx.write().await;
309 :
310 0 : let operations = spec
311 0 : .delta_operations
312 0 : .iter()
313 0 : .flatten()
314 0 : .filter_map(move |op| {
315 0 : let databases = &mut ctx.dbs;
316 0 : match op.action.as_str() {
317 0 : // We do not check whether the DB exists or not,
318 0 : // Postgres will take care of it for us
319 0 : "delete_db" => {
320 : // In Postgres we can't drop a database if it is a template.
321 : // So we need to unset the template flag first, but it could
322 : // be a retry, so we could've already dropped the database.
323 : // Check that database exists first to make it idempotent.
324 0 : let unset_template_query: String = format!(
325 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
326 0 : datname_str = escape_literal(&op.name),
327 0 : datname = &op.name.pg_quote()
328 0 : );
329 0 :
330 0 : // Use FORCE to drop database even if there are active connections.
331 0 : // We run this from `cloud_admin`, so it should have enough privileges.
332 0 : //
333 0 : // NB: there could be other db states, which prevent us from dropping
334 0 : // the database. For example, if db is used by any active subscription
335 0 : // or replication slot.
336 0 : // Such cases are handled in the DropLogicalSubscriptions
337 0 : // phase. We do all the cleanup before actually dropping the database.
338 0 : let drop_db_query: String = format!(
339 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
340 0 : &op.name.pg_quote()
341 0 : );
342 0 :
343 0 : databases.remove(&op.name);
344 0 :
345 0 : Some(vec![
346 0 : Operation {
347 0 : query: unset_template_query,
348 0 : comment: Some(format!(
349 0 : "optionally clearing template flags for DB {}",
350 0 : op.name,
351 0 : )),
352 0 : },
353 0 : Operation {
354 0 : query: drop_db_query,
355 0 : comment: Some(format!("deleting database {}", op.name,)),
356 0 : },
357 0 : ])
358 : }
359 0 : "rename_db" => {
360 0 : if let Some(mut db) = databases.remove(&op.name) {
361 : // update state of known databases
362 0 : let new_name = op.new_name.as_ref().unwrap();
363 0 : db.name = new_name.clone();
364 0 : databases.insert(db.name.clone(), db);
365 0 :
366 0 : Some(vec![Operation {
367 0 : query: format!(
368 0 : "ALTER DATABASE {} RENAME TO {}",
369 0 : op.name.pg_quote(),
370 0 : new_name.pg_quote(),
371 0 : ),
372 0 : comment: Some(format!(
373 0 : "renaming database '{}' to '{}'",
374 0 : op.name, new_name
375 0 : )),
376 0 : }])
377 : } else {
378 0 : None
379 : }
380 : }
381 0 : _ => None,
382 : }
383 0 : })
384 0 : .flatten();
385 0 :
386 0 : Ok(Box::new(operations))
387 : }
388 : ApplySpecPhase::CreateAndAlterDatabases => {
389 0 : let mut ctx = ctx.write().await;
390 :
391 0 : let operations = spec
392 0 : .cluster
393 0 : .databases
394 0 : .iter()
395 0 : .filter_map(move |db| {
396 0 : let databases = &mut ctx.dbs;
397 0 : if let Some(edb) = databases.get_mut(&db.name) {
398 0 : let change_owner = if edb.owner.starts_with('"') {
399 0 : db.owner.pg_quote() != edb.owner
400 : } else {
401 0 : db.owner != edb.owner
402 : };
403 :
404 0 : edb.owner = db.owner.clone();
405 0 :
406 0 : if change_owner {
407 0 : Some(vec![Operation {
408 0 : query: format!(
409 0 : "ALTER DATABASE {} OWNER TO {}",
410 0 : db.name.pg_quote(),
411 0 : db.owner.pg_quote()
412 0 : ),
413 0 : comment: Some(format!(
414 0 : "changing database owner of database {} to {}",
415 0 : db.name, db.owner
416 0 : )),
417 0 : }])
418 : } else {
419 0 : None
420 : }
421 : } else {
422 0 : databases.insert(db.name.clone(), db.clone());
423 0 :
424 0 : Some(vec![
425 0 : Operation {
426 0 : query: format!(
427 0 : "CREATE DATABASE {} {}",
428 0 : db.name.pg_quote(),
429 0 : db.to_pg_options(),
430 0 : ),
431 0 : comment: None,
432 0 : },
433 0 : Operation {
434 0 : query: format!(
435 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
436 0 : db.name.pg_quote()
437 0 : ),
438 0 : comment: None,
439 0 : },
440 0 : ])
441 : }
442 0 : })
443 0 : .flatten();
444 0 :
445 0 : Ok(Box::new(operations))
446 : }
447 0 : ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
448 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
449 0 : comment: Some(String::from(
450 0 : "create schema for neon extension and utils tables",
451 0 : )),
452 0 : }))),
453 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
454 0 : match subphase {
455 : PerDatabasePhase::DropLogicalSubscriptions => {
456 0 : match &db {
457 0 : DB::UserDB(db) => {
458 0 : let drop_subscription_query: String = format!(
459 0 : include_str!("sql/drop_subscriptions.sql"),
460 0 : datname_str = escape_literal(&db.name),
461 0 : );
462 0 :
463 0 : let operations = vec![Operation {
464 0 : query: drop_subscription_query,
465 0 : comment: Some(format!(
466 0 : "optionally dropping subscriptions for DB {}",
467 0 : db.name,
468 0 : )),
469 0 : }]
470 0 : .into_iter();
471 0 :
472 0 : Ok(Box::new(operations))
473 : }
474 : // skip this cleanup for the system databases
475 : // because users can't drop them
476 0 : DB::SystemDB => Ok(Box::new(empty())),
477 : }
478 : }
479 : PerDatabasePhase::DeleteDBRoleReferences => {
480 0 : let ctx = ctx.read().await;
481 :
482 0 : let operations =
483 0 : spec.delta_operations
484 0 : .iter()
485 0 : .flatten()
486 0 : .filter(|op| op.action == "delete_role")
487 0 : .filter_map(move |op| {
488 0 : if db.is_owned_by(&op.name) {
489 0 : return None;
490 0 : }
491 0 : if !ctx.roles.contains_key(&op.name) {
492 0 : return None;
493 0 : }
494 0 : let quoted = op.name.pg_quote();
495 0 : let new_owner = match &db {
496 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
497 0 : DB::UserDB(db) => db.owner.pg_quote(),
498 : };
499 :
500 0 : Some(vec![
501 0 : // This will reassign all dependent objects to the db owner
502 0 : Operation {
503 0 : query: format!(
504 0 : "REASSIGN OWNED BY {} TO {}",
505 0 : quoted, new_owner,
506 0 : ),
507 0 : comment: None,
508 0 : },
509 0 : // Revoke some potentially blocking privileges (Neon-specific currently)
510 0 : Operation {
511 0 : query: format!(
512 0 : include_str!("sql/pre_drop_role_revoke_privileges.sql"),
513 0 : role_name = quoted,
514 0 : ),
515 0 : comment: None,
516 0 : },
517 0 : // This now will only drop privileges of the role
518 0 : // TODO: this is obviously not 100% true because of the above case,
519 0 : // there could be still some privileges that are not revoked. Maybe this
520 0 : // only drops privileges that were granted *by this* role, not *to this* role,
521 0 : // but this has to be checked.
522 0 : Operation {
523 0 : query: format!("DROP OWNED BY {}", quoted),
524 0 : comment: None,
525 0 : },
526 0 : ])
527 0 : })
528 0 : .flatten();
529 0 :
530 0 : Ok(Box::new(operations))
531 : }
532 : PerDatabasePhase::ChangeSchemaPerms => {
533 0 : let ctx = ctx.read().await;
534 0 : let databases = &ctx.dbs;
535 :
536 0 : let db = match &db {
537 : // ignore schema permissions on the system database
538 0 : DB::SystemDB => return Ok(Box::new(empty())),
539 0 : DB::UserDB(db) => db,
540 0 : };
541 0 :
542 0 : if databases.get(&db.name).is_none() {
543 0 : bail!("database {} doesn't exist in PostgreSQL", db.name);
544 0 : }
545 0 :
546 0 : let edb = databases.get(&db.name).unwrap();
547 0 :
548 0 : if edb.restrict_conn || edb.invalid {
549 0 : return Ok(Box::new(empty()));
550 0 : }
551 0 :
552 0 : let operations = vec![
553 0 : Operation {
554 0 : query: format!(
555 0 : include_str!("sql/set_public_schema_owner.sql"),
556 0 : db_owner = db.owner.pg_quote()
557 0 : ),
558 0 : comment: None,
559 0 : },
560 0 : Operation {
561 0 : query: String::from(include_str!("sql/default_grants.sql")),
562 0 : comment: None,
563 0 : },
564 0 : ]
565 0 : .into_iter();
566 0 :
567 0 : Ok(Box::new(operations))
568 : }
569 : PerDatabasePhase::HandleAnonExtension => {
570 : // Only install Anon into user databases
571 0 : let db = match &db {
572 0 : DB::SystemDB => return Ok(Box::new(empty())),
573 0 : DB::UserDB(db) => db,
574 0 : };
575 0 : // Never install Anon when it's not enabled as feature
576 0 : if !spec.features.contains(&ComputeFeature::AnonExtension) {
577 0 : return Ok(Box::new(empty()));
578 0 : }
579 0 :
580 0 : // Only install Anon when it's added in preload libraries
581 0 : let opt_libs = spec.cluster.settings.find("shared_preload_libraries");
582 :
583 0 : let libs = match opt_libs {
584 0 : Some(libs) => libs,
585 0 : None => return Ok(Box::new(empty())),
586 : };
587 :
588 0 : if !libs.contains("anon") {
589 0 : return Ok(Box::new(empty()));
590 0 : }
591 0 :
592 0 : let db_owner = db.owner.pg_quote();
593 0 :
594 0 : let operations = vec![
595 0 : // Create anon extension if this compute needs it
596 0 : // Users cannot create it themselves, because superuser is required.
597 0 : Operation {
598 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"),
599 0 : comment: Some(String::from("creating anon extension")),
600 0 : },
601 0 : // Initialize anon extension
602 0 : // This also requires superuser privileges, so users cannot do it themselves.
603 0 : Operation {
604 0 : query: String::from("SELECT anon.init()"),
605 0 : comment: Some(String::from("initializing anon extension data")),
606 0 : },
607 0 : Operation {
608 0 : query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner),
609 0 : comment: Some(String::from(
610 0 : "granting anon extension schema permissions",
611 0 : )),
612 0 : },
613 0 : Operation {
614 0 : query: format!(
615 0 : "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}",
616 0 : db_owner
617 0 : ),
618 0 : comment: Some(String::from(
619 0 : "granting anon extension schema functions permissions",
620 0 : )),
621 0 : },
622 0 : // We need this, because some functions are defined as SECURITY DEFINER.
623 0 : // In Postgres SECURITY DEFINER functions are executed with the privileges
624 0 : // of the owner.
625 0 : // In anon extension this it is needed to access some GUCs, which are only accessible to
626 0 : // superuser. But we've patched postgres to allow db_owner to access them as well.
627 0 : // So we need to change owner of these functions to db_owner.
628 0 : Operation {
629 0 : query: format!(
630 0 : include_str!("sql/anon_ext_fn_reassign.sql"),
631 0 : db_owner = db_owner,
632 0 : ),
633 0 : comment: Some(String::from(
634 0 : "change anon extension functions owner to database_owner",
635 0 : )),
636 0 : },
637 0 : Operation {
638 0 : query: format!(
639 0 : "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}",
640 0 : db_owner,
641 0 : ),
642 0 : comment: Some(String::from(
643 0 : "granting anon extension tables permissions",
644 0 : )),
645 0 : },
646 0 : Operation {
647 0 : query: format!(
648 0 : "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}",
649 0 : db_owner,
650 0 : ),
651 0 : comment: Some(String::from(
652 0 : "granting anon extension sequences permissions",
653 0 : )),
654 0 : },
655 0 : ]
656 0 : .into_iter();
657 0 :
658 0 : Ok(Box::new(operations))
659 : }
660 : }
661 : }
662 : // Interestingly, we only install p_s_s in the main database, even when
663 : // it's preloaded.
664 : ApplySpecPhase::HandleOtherExtensions => {
665 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
666 0 : if libs.contains("pg_stat_statements") {
667 0 : return Ok(Box::new(once(Operation {
668 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
669 0 : comment: Some(String::from("create system extensions")),
670 0 : })));
671 0 : }
672 0 : }
673 0 : Ok(Box::new(empty()))
674 : }
675 : ApplySpecPhase::HandleNeonExtension => {
676 0 : let operations = vec![
677 0 : Operation {
678 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
679 0 : comment: Some(String::from(
680 0 : "init: install the extension if not already installed",
681 0 : )),
682 0 : },
683 0 : Operation {
684 0 : query: String::from(
685 0 : "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
686 0 : ),
687 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
688 0 : },
689 0 : Operation {
690 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
691 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
692 0 : },
693 0 : Operation {
694 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
695 0 : comment: Some(String::from("compat/update: update neon extension version")),
696 0 : },
697 0 : ]
698 0 : .into_iter();
699 0 :
700 0 : Ok(Box::new(operations))
701 : }
702 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
703 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
704 0 : comment: None,
705 0 : }))),
706 : ApplySpecPhase::DropRoles => {
707 0 : let operations = spec
708 0 : .delta_operations
709 0 : .iter()
710 0 : .flatten()
711 0 : .filter(|op| op.action == "delete_role")
712 0 : .map(|op| Operation {
713 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
714 0 : comment: None,
715 0 : });
716 0 :
717 0 : Ok(Box::new(operations))
718 : }
719 0 : ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
720 0 : query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
721 0 : comment: None,
722 0 : }))),
723 : }
724 0 : }
|