Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::empty;
5 : use std::iter::once;
6 : use std::sync::Arc;
7 :
8 : use crate::compute::construct_superuser_query;
9 : use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
10 : use anyhow::{bail, Result};
11 : use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
12 : use futures::future::join_all;
13 : use tokio::sync::RwLock;
14 : use tokio_postgres::Client;
15 : use tracing::{debug, info_span, Instrument};
16 :
17 : #[derive(Clone)]
18 : pub enum DB {
19 : SystemDB,
20 : UserDB(Database),
21 : }
22 :
23 : impl DB {
24 0 : pub fn new(db: Database) -> DB {
25 0 : Self::UserDB(db)
26 0 : }
27 :
28 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
29 0 : match self {
30 0 : DB::SystemDB => false,
31 0 : DB::UserDB(db) => &db.owner == role,
32 : }
33 0 : }
34 : }
35 :
36 : impl Debug for DB {
37 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 0 : match self {
39 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
40 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
41 : }
42 0 : }
43 : }
44 :
45 : #[derive(Copy, Clone, Debug)]
46 : pub enum PerDatabasePhase {
47 : DeleteDBRoleReferences,
48 : ChangeSchemaPerms,
49 : HandleAnonExtension,
50 : DropSubscriptionsForDeletedDatabases,
51 : }
52 :
53 : #[derive(Clone, Debug)]
54 : pub enum ApplySpecPhase {
55 : CreateSuperUser,
56 : DropInvalidDatabases,
57 : RenameRoles,
58 : CreateAndAlterRoles,
59 : RenameAndDeleteDatabases,
60 : CreateAndAlterDatabases,
61 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
62 : HandleOtherExtensions,
63 : HandleNeonExtension,
64 : CreateAvailabilityCheck,
65 : DropRoles,
66 : }
67 :
68 : pub struct Operation {
69 : pub query: String,
70 : pub comment: Option<String>,
71 : }
72 :
73 : pub struct MutableApplyContext {
74 : pub roles: HashMap<String, Role>,
75 : pub dbs: HashMap<String, Database>,
76 : }
77 :
78 : /// Apply the operations that belong to the given spec apply phase.
79 : ///
80 : /// Commands within a single phase are executed in order of Iterator yield.
81 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
82 : /// indicated by its `db` field, and can share a single client for all changes
83 : /// to that database.
84 : ///
85 : /// Notes:
86 : /// - Commands are pipelined, and thus may cause incomplete apply if one
87 : /// command of many fails.
88 : /// - Failing commands will fail the phase's apply step once the return value
89 : /// is processed.
90 : /// - No timeouts have (yet) been implemented.
91 : /// - The caller is responsible for limiting and/or applying concurrency.
92 0 : pub async fn apply_operations<'a, Fut, F>(
93 0 : spec: Arc<ComputeSpec>,
94 0 : ctx: Arc<RwLock<MutableApplyContext>>,
95 0 : jwks_roles: Arc<HashSet<String>>,
96 0 : apply_spec_phase: ApplySpecPhase,
97 0 : client: F,
98 0 : ) -> Result<()>
99 0 : where
100 0 : F: FnOnce() -> Fut,
101 0 : Fut: Future<Output = Result<&'a Client>>,
102 0 : {
103 0 : debug!("Starting phase {:?}", &apply_spec_phase);
104 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
105 0 : let span2 = span.clone();
106 0 : async move {
107 0 : debug!("Processing phase {:?}", &apply_spec_phase);
108 0 : let ctx = ctx;
109 :
110 0 : let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
111 0 : .await?
112 0 : .peekable();
113 0 :
114 0 : // Return (and by doing so, skip requesting the PostgreSQL client) if
115 0 : // we don't have any operations scheduled.
116 0 : if ops.peek().is_none() {
117 0 : return Ok(());
118 0 : }
119 :
120 0 : let client = client().await?;
121 :
122 0 : debug!("Applying phase {:?}", &apply_spec_phase);
123 :
124 0 : let active_queries = ops
125 0 : .map(|op| {
126 0 : let Operation { comment, query } = op;
127 0 : let inspan = match comment {
128 0 : None => span.clone(),
129 0 : Some(comment) => info_span!("phase {}: {}", comment),
130 : };
131 :
132 0 : async {
133 0 : let query = query;
134 0 : let res = client.simple_query(&query).await;
135 0 : debug!(
136 0 : "{} {}",
137 0 : if res.is_ok() {
138 0 : "successfully executed"
139 : } else {
140 0 : "failed to execute"
141 : },
142 : query
143 : );
144 0 : res
145 0 : }
146 0 : .instrument(inspan)
147 0 : })
148 0 : .collect::<Vec<_>>();
149 0 :
150 0 : drop(ctx);
151 :
152 0 : for it in join_all(active_queries).await {
153 0 : drop(it?);
154 : }
155 :
156 0 : debug!("Completed phase {:?}", &apply_spec_phase);
157 :
158 0 : Ok(())
159 0 : }
160 0 : .instrument(span2)
161 0 : .await
162 0 : }
163 :
164 : /// Create a stream of operations to be executed for that phase of applying
165 : /// changes.
166 : ///
167 : /// In the future we may generate a single stream of changes and then
168 : /// sort/merge/batch execution, but for now this is a nice way to improve
169 : /// batching behaviour of the commands.
170 0 : async fn get_operations<'a>(
171 0 : spec: &'a ComputeSpec,
172 0 : ctx: &'a RwLock<MutableApplyContext>,
173 0 : jwks_roles: &'a HashSet<String>,
174 0 : apply_spec_phase: &'a ApplySpecPhase,
175 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
176 0 : match apply_spec_phase {
177 : ApplySpecPhase::CreateSuperUser => {
178 0 : let query = construct_superuser_query(spec);
179 0 :
180 0 : Ok(Box::new(once(Operation {
181 0 : query,
182 0 : comment: None,
183 0 : })))
184 : }
185 : ApplySpecPhase::DropInvalidDatabases => {
186 0 : let mut ctx = ctx.write().await;
187 0 : let databases = &mut ctx.dbs;
188 0 :
189 0 : let keys: Vec<_> = databases
190 0 : .iter()
191 0 : .filter(|(_, db)| db.invalid)
192 0 : .map(|(dbname, _)| dbname.clone())
193 0 : .collect();
194 0 :
195 0 : // After recent commit in Postgres, interrupted DROP DATABASE
196 0 : // leaves the database in the invalid state. According to the
197 0 : // commit message, the only option for user is to drop it again.
198 0 : // See:
199 0 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
200 0 : //
201 0 : // Postgres Neon extension is done the way, that db is de-registered
202 0 : // in the control plane metadata only after it is dropped. So there is
203 0 : // a chance that it still thinks that the db should exist. This means
204 0 : // that it will be re-created by the `CreateDatabases` phase. This
205 0 : // is fine, as user can just drop the table again (in vanilla
206 0 : // Postgres they would need to do the same).
207 0 : let operations = keys
208 0 : .into_iter()
209 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
210 0 : .map(|db| Operation {
211 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
212 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
213 0 : });
214 0 :
215 0 : Ok(Box::new(operations))
216 : }
217 : ApplySpecPhase::RenameRoles => {
218 0 : let mut ctx = ctx.write().await;
219 :
220 0 : let operations = spec
221 0 : .delta_operations
222 0 : .iter()
223 0 : .flatten()
224 0 : .filter(|op| op.action == "rename_role")
225 0 : .filter_map(move |op| {
226 0 : let roles = &mut ctx.roles;
227 0 :
228 0 : if roles.contains_key(op.name.as_str()) {
229 0 : None
230 : } else {
231 0 : let new_name = op.new_name.as_ref().unwrap();
232 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
233 0 :
234 0 : role.name = new_name.clone();
235 0 : role.encrypted_password = None;
236 0 : roles.insert(role.name.clone(), role);
237 0 :
238 0 : Some(Operation {
239 0 : query: format!(
240 0 : "ALTER ROLE {} RENAME TO {}",
241 0 : op.name.pg_quote(),
242 0 : new_name.pg_quote()
243 0 : ),
244 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
245 0 : })
246 : }
247 0 : });
248 0 :
249 0 : Ok(Box::new(operations))
250 : }
251 : ApplySpecPhase::CreateAndAlterRoles => {
252 0 : let mut ctx = ctx.write().await;
253 :
254 0 : let operations = spec.cluster.roles
255 0 : .iter()
256 0 : .filter_map(move |role| {
257 0 : let roles = &mut ctx.roles;
258 0 : let db_role = roles.get(&role.name);
259 0 :
260 0 : match db_role {
261 0 : Some(db_role) => {
262 0 : if db_role.encrypted_password != role.encrypted_password {
263 : // This can be run on /every/ role! Not just ones created through the console.
264 : // This means that if you add some funny ALTER here that adds a permission,
265 : // this will get run even on user-created roles! This will result in different
266 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
267 : // now only grants LOGIN and changes the password. Please do not allow this branch
268 : // to do anything silly.
269 0 : Some(Operation {
270 0 : query: format!(
271 0 : "ALTER ROLE {} {}",
272 0 : role.name.pg_quote(),
273 0 : role.to_pg_options(),
274 0 : ),
275 0 : comment: None,
276 0 : })
277 : } else {
278 0 : None
279 : }
280 : }
281 : None => {
282 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
283 0 : format!(
284 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
285 0 : role.name.pg_quote(),
286 0 : role.to_pg_options(),
287 0 : )
288 : } else {
289 0 : format!(
290 0 : "CREATE ROLE {} {}",
291 0 : role.name.pg_quote(),
292 0 : role.to_pg_options(),
293 0 : )
294 : };
295 0 : Some(Operation {
296 0 : query,
297 0 : comment: Some(format!("creating role {}", role.name)),
298 0 : })
299 : }
300 : }
301 0 : });
302 0 :
303 0 : Ok(Box::new(operations))
304 : }
305 : ApplySpecPhase::RenameAndDeleteDatabases => {
306 0 : let mut ctx = ctx.write().await;
307 :
308 0 : let operations = spec
309 0 : .delta_operations
310 0 : .iter()
311 0 : .flatten()
312 0 : .filter_map(move |op| {
313 0 : let databases = &mut ctx.dbs;
314 0 : match op.action.as_str() {
315 0 : // We do not check whether the DB exists or not,
316 0 : // Postgres will take care of it for us
317 0 : "delete_db" => {
318 : // In Postgres we can't drop a database if it is a template.
319 : // So we need to unset the template flag first, but it could
320 : // be a retry, so we could've already dropped the database.
321 : // Check that database exists first to make it idempotent.
322 0 : let unset_template_query: String = format!(
323 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
324 0 : datname_str = escape_literal(&op.name),
325 0 : datname = &op.name.pg_quote()
326 0 : );
327 0 :
328 0 : // Use FORCE to drop database even if there are active connections.
329 0 : // We run this from `cloud_admin`, so it should have enough privileges.
330 0 : //
331 0 : // NB: there could be other db states, which prevent us from dropping
332 0 : // the database. For example, if db is used by any active subscription
333 0 : // or replication slot.
334 0 : // Such cases are handled in the DropSubscriptionsForDeletedDatabases
335 0 : // phase. We do all the cleanup before actually dropping the database.
336 0 : let drop_db_query: String = format!(
337 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
338 0 : &op.name.pg_quote()
339 0 : );
340 0 :
341 0 : databases.remove(&op.name);
342 0 :
343 0 : Some(vec![
344 0 : Operation {
345 0 : query: unset_template_query,
346 0 : comment: Some(format!(
347 0 : "optionally clearing template flags for DB {}",
348 0 : op.name,
349 0 : )),
350 0 : },
351 0 : Operation {
352 0 : query: drop_db_query,
353 0 : comment: Some(format!("deleting database {}", op.name,)),
354 0 : },
355 0 : ])
356 : }
357 0 : "rename_db" => {
358 0 : if let Some(mut db) = databases.remove(&op.name) {
359 : // update state of known databases
360 0 : let new_name = op.new_name.as_ref().unwrap();
361 0 : db.name = new_name.clone();
362 0 : databases.insert(db.name.clone(), db);
363 0 :
364 0 : Some(vec![Operation {
365 0 : query: format!(
366 0 : "ALTER DATABASE {} RENAME TO {}",
367 0 : op.name.pg_quote(),
368 0 : new_name.pg_quote(),
369 0 : ),
370 0 : comment: Some(format!(
371 0 : "renaming database '{}' to '{}'",
372 0 : op.name, new_name
373 0 : )),
374 0 : }])
375 : } else {
376 0 : None
377 : }
378 : }
379 0 : _ => None,
380 : }
381 0 : })
382 0 : .flatten();
383 0 :
384 0 : Ok(Box::new(operations))
385 : }
386 : ApplySpecPhase::CreateAndAlterDatabases => {
387 0 : let mut ctx = ctx.write().await;
388 :
389 0 : let operations = spec
390 0 : .cluster
391 0 : .databases
392 0 : .iter()
393 0 : .filter_map(move |db| {
394 0 : let databases = &mut ctx.dbs;
395 0 : if let Some(edb) = databases.get_mut(&db.name) {
396 0 : let change_owner = if edb.owner.starts_with('"') {
397 0 : db.owner.pg_quote() != edb.owner
398 : } else {
399 0 : db.owner != edb.owner
400 : };
401 :
402 0 : edb.owner = db.owner.clone();
403 0 :
404 0 : if change_owner {
405 0 : Some(vec![Operation {
406 0 : query: format!(
407 0 : "ALTER DATABASE {} OWNER TO {}",
408 0 : db.name.pg_quote(),
409 0 : db.owner.pg_quote()
410 0 : ),
411 0 : comment: Some(format!(
412 0 : "changing database owner of database {} to {}",
413 0 : db.name, db.owner
414 0 : )),
415 0 : }])
416 : } else {
417 0 : None
418 : }
419 : } else {
420 0 : databases.insert(db.name.clone(), db.clone());
421 0 :
422 0 : Some(vec![
423 0 : Operation {
424 0 : query: format!(
425 0 : "CREATE DATABASE {} {}",
426 0 : db.name.pg_quote(),
427 0 : db.to_pg_options(),
428 0 : ),
429 0 : comment: None,
430 0 : },
431 0 : Operation {
432 0 : query: format!(
433 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
434 0 : db.name.pg_quote()
435 0 : ),
436 0 : comment: None,
437 0 : },
438 0 : ])
439 : }
440 0 : })
441 0 : .flatten();
442 0 :
443 0 : Ok(Box::new(operations))
444 : }
445 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
446 0 : match subphase {
447 : PerDatabasePhase::DropSubscriptionsForDeletedDatabases => {
448 0 : match &db {
449 0 : DB::UserDB(db) => {
450 0 : let drop_subscription_query: String = format!(
451 0 : include_str!("sql/drop_subscription_for_drop_dbs.sql"),
452 0 : datname_str = escape_literal(&db.name),
453 0 : );
454 0 :
455 0 : let operations = vec![Operation {
456 0 : query: drop_subscription_query,
457 0 : comment: Some(format!(
458 0 : "optionally dropping subscriptions for DB {}",
459 0 : db.name,
460 0 : )),
461 0 : }]
462 0 : .into_iter();
463 0 :
464 0 : Ok(Box::new(operations))
465 : }
466 : // skip this cleanup for the system databases
467 : // because users can't drop them
468 0 : DB::SystemDB => Ok(Box::new(empty())),
469 : }
470 : }
471 : PerDatabasePhase::DeleteDBRoleReferences => {
472 0 : let ctx = ctx.read().await;
473 :
474 0 : let operations =
475 0 : spec.delta_operations
476 0 : .iter()
477 0 : .flatten()
478 0 : .filter(|op| op.action == "delete_role")
479 0 : .filter_map(move |op| {
480 0 : if db.is_owned_by(&op.name) {
481 0 : return None;
482 0 : }
483 0 : if !ctx.roles.contains_key(&op.name) {
484 0 : return None;
485 0 : }
486 0 : let quoted = op.name.pg_quote();
487 0 : let new_owner = match &db {
488 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
489 0 : DB::UserDB(db) => db.owner.pg_quote(),
490 : };
491 :
492 0 : Some(vec![
493 0 : // This will reassign all dependent objects to the db owner
494 0 : Operation {
495 0 : query: format!(
496 0 : "REASSIGN OWNED BY {} TO {}",
497 0 : quoted, new_owner,
498 0 : ),
499 0 : comment: None,
500 0 : },
501 0 : // Revoke some potentially blocking privileges (Neon-specific currently)
502 0 : Operation {
503 0 : query: format!(
504 0 : include_str!("sql/pre_drop_role_revoke_privileges.sql"),
505 0 : role_name = quoted,
506 0 : ),
507 0 : comment: None,
508 0 : },
509 0 : // This now will only drop privileges of the role
510 0 : // TODO: this is obviously not 100% true because of the above case,
511 0 : // there could be still some privileges that are not revoked. Maybe this
512 0 : // only drops privileges that were granted *by this* role, not *to this* role,
513 0 : // but this has to be checked.
514 0 : Operation {
515 0 : query: format!("DROP OWNED BY {}", quoted),
516 0 : comment: None,
517 0 : },
518 0 : ])
519 0 : })
520 0 : .flatten();
521 0 :
522 0 : Ok(Box::new(operations))
523 : }
524 : PerDatabasePhase::ChangeSchemaPerms => {
525 0 : let ctx = ctx.read().await;
526 0 : let databases = &ctx.dbs;
527 :
528 0 : let db = match &db {
529 : // ignore schema permissions on the system database
530 0 : DB::SystemDB => return Ok(Box::new(empty())),
531 0 : DB::UserDB(db) => db,
532 0 : };
533 0 :
534 0 : if databases.get(&db.name).is_none() {
535 0 : bail!("database {} doesn't exist in PostgreSQL", db.name);
536 0 : }
537 0 :
538 0 : let edb = databases.get(&db.name).unwrap();
539 0 :
540 0 : if edb.restrict_conn || edb.invalid {
541 0 : return Ok(Box::new(empty()));
542 0 : }
543 0 :
544 0 : let operations = vec![
545 0 : Operation {
546 0 : query: format!(
547 0 : include_str!("sql/set_public_schema_owner.sql"),
548 0 : db_owner = db.owner.pg_quote()
549 0 : ),
550 0 : comment: None,
551 0 : },
552 0 : Operation {
553 0 : query: String::from(include_str!("sql/default_grants.sql")),
554 0 : comment: None,
555 0 : },
556 0 : ]
557 0 : .into_iter();
558 0 :
559 0 : Ok(Box::new(operations))
560 : }
561 : PerDatabasePhase::HandleAnonExtension => {
562 : // Only install Anon into user databases
563 0 : let db = match &db {
564 0 : DB::SystemDB => return Ok(Box::new(empty())),
565 0 : DB::UserDB(db) => db,
566 0 : };
567 0 : // Never install Anon when it's not enabled as feature
568 0 : if !spec.features.contains(&ComputeFeature::AnonExtension) {
569 0 : return Ok(Box::new(empty()));
570 0 : }
571 0 :
572 0 : // Only install Anon when it's added in preload libraries
573 0 : let opt_libs = spec.cluster.settings.find("shared_preload_libraries");
574 :
575 0 : let libs = match opt_libs {
576 0 : Some(libs) => libs,
577 0 : None => return Ok(Box::new(empty())),
578 : };
579 :
580 0 : if !libs.contains("anon") {
581 0 : return Ok(Box::new(empty()));
582 0 : }
583 0 :
584 0 : let db_owner = db.owner.pg_quote();
585 0 :
586 0 : let operations = vec![
587 0 : // Create anon extension if this compute needs it
588 0 : // Users cannot create it themselves, because superuser is required.
589 0 : Operation {
590 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"),
591 0 : comment: Some(String::from("creating anon extension")),
592 0 : },
593 0 : // Initialize anon extension
594 0 : // This also requires superuser privileges, so users cannot do it themselves.
595 0 : Operation {
596 0 : query: String::from("SELECT anon.init()"),
597 0 : comment: Some(String::from("initializing anon extension data")),
598 0 : },
599 0 : Operation {
600 0 : query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner),
601 0 : comment: Some(String::from(
602 0 : "granting anon extension schema permissions",
603 0 : )),
604 0 : },
605 0 : Operation {
606 0 : query: format!(
607 0 : "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}",
608 0 : db_owner
609 0 : ),
610 0 : comment: Some(String::from(
611 0 : "granting anon extension schema functions permissions",
612 0 : )),
613 0 : },
614 0 : // We need this, because some functions are defined as SECURITY DEFINER.
615 0 : // In Postgres SECURITY DEFINER functions are executed with the privileges
616 0 : // of the owner.
617 0 : // In anon extension this it is needed to access some GUCs, which are only accessible to
618 0 : // superuser. But we've patched postgres to allow db_owner to access them as well.
619 0 : // So we need to change owner of these functions to db_owner.
620 0 : Operation {
621 0 : query: format!(
622 0 : include_str!("sql/anon_ext_fn_reassign.sql"),
623 0 : db_owner = db_owner,
624 0 : ),
625 0 : comment: Some(String::from(
626 0 : "change anon extension functions owner to database_owner",
627 0 : )),
628 0 : },
629 0 : Operation {
630 0 : query: format!(
631 0 : "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}",
632 0 : db_owner,
633 0 : ),
634 0 : comment: Some(String::from(
635 0 : "granting anon extension tables permissions",
636 0 : )),
637 0 : },
638 0 : Operation {
639 0 : query: format!(
640 0 : "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}",
641 0 : db_owner,
642 0 : ),
643 0 : comment: Some(String::from(
644 0 : "granting anon extension sequences permissions",
645 0 : )),
646 0 : },
647 0 : ]
648 0 : .into_iter();
649 0 :
650 0 : Ok(Box::new(operations))
651 : }
652 : }
653 : }
654 : // Interestingly, we only install p_s_s in the main database, even when
655 : // it's preloaded.
656 : ApplySpecPhase::HandleOtherExtensions => {
657 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
658 0 : if libs.contains("pg_stat_statements") {
659 0 : return Ok(Box::new(once(Operation {
660 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
661 0 : comment: Some(String::from("create system extensions")),
662 0 : })));
663 0 : }
664 0 : }
665 0 : Ok(Box::new(empty()))
666 : }
667 : ApplySpecPhase::HandleNeonExtension => {
668 0 : let operations = vec![
669 0 : Operation {
670 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
671 0 : comment: Some(String::from("init: add schema for extension")),
672 0 : },
673 0 : Operation {
674 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
675 0 : comment: Some(String::from(
676 0 : "init: install the extension if not already installed",
677 0 : )),
678 0 : },
679 0 : Operation {
680 0 : query: String::from(
681 0 : "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
682 0 : ),
683 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
684 0 : },
685 0 : Operation {
686 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
687 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
688 0 : },
689 0 : Operation {
690 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
691 0 : comment: Some(String::from("compat/update: update neon extension version")),
692 0 : },
693 0 : ]
694 0 : .into_iter();
695 0 :
696 0 : Ok(Box::new(operations))
697 : }
698 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
699 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
700 0 : comment: None,
701 0 : }))),
702 : ApplySpecPhase::DropRoles => {
703 0 : let operations = spec
704 0 : .delta_operations
705 0 : .iter()
706 0 : .flatten()
707 0 : .filter(|op| op.action == "delete_role")
708 0 : .map(|op| Operation {
709 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
710 0 : comment: None,
711 0 : });
712 0 :
713 0 : Ok(Box::new(operations))
714 : }
715 : }
716 0 : }
|