Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::empty;
5 : use std::iter::once;
6 : use std::sync::Arc;
7 :
8 : use crate::compute::construct_superuser_query;
9 : use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
10 : use anyhow::{bail, Result};
11 : use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
12 : use futures::future::join_all;
13 : use tokio::sync::RwLock;
14 : use tokio_postgres::Client;
15 : use tracing::{debug, info_span, Instrument};
16 :
17 : #[derive(Clone)]
18 : pub enum DB {
19 : SystemDB,
20 : UserDB(Database),
21 : }
22 :
23 : impl DB {
24 0 : pub fn new(db: Database) -> DB {
25 0 : Self::UserDB(db)
26 0 : }
27 :
28 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
29 0 : match self {
30 0 : DB::SystemDB => false,
31 0 : DB::UserDB(db) => &db.owner == role,
32 : }
33 0 : }
34 : }
35 :
36 : impl Debug for DB {
37 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 0 : match self {
39 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
40 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
41 : }
42 0 : }
43 : }
44 :
45 : #[derive(Copy, Clone, Debug)]
46 : pub enum PerDatabasePhase {
47 : DeleteDBRoleReferences,
48 : ChangeSchemaPerms,
49 : HandleAnonExtension,
50 : }
51 :
52 : #[derive(Clone, Debug)]
53 : pub enum ApplySpecPhase {
54 : CreateSuperUser,
55 : DropInvalidDatabases,
56 : RenameRoles,
57 : CreateAndAlterRoles,
58 : RenameAndDeleteDatabases,
59 : CreateAndAlterDatabases,
60 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
61 : HandleOtherExtensions,
62 : HandleNeonExtension,
63 : CreateAvailabilityCheck,
64 : DropRoles,
65 : }
66 :
67 : pub struct Operation {
68 : pub query: String,
69 : pub comment: Option<String>,
70 : }
71 :
72 : pub struct MutableApplyContext {
73 : pub roles: HashMap<String, Role>,
74 : pub dbs: HashMap<String, Database>,
75 : }
76 :
77 : /// Appply the operations that belong to the given spec apply phase.
78 : ///
79 : /// Commands within a single phase are executed in order of Iterator yield.
80 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
81 : /// indicated by its `db` field, and can share a single client for all changes
82 : /// to that database.
83 : ///
84 : /// Notes:
85 : /// - Commands are pipelined, and thus may cause incomplete apply if one
86 : /// command of many fails.
87 : /// - Failing commands will fail the phase's apply step once the return value
88 : /// is processed.
89 : /// - No timeouts have (yet) been implemented.
90 : /// - The caller is responsible for limiting and/or applying concurrency.
91 0 : pub async fn apply_operations<'a, Fut, F>(
92 0 : spec: Arc<ComputeSpec>,
93 0 : ctx: Arc<RwLock<MutableApplyContext>>,
94 0 : jwks_roles: Arc<HashSet<String>>,
95 0 : apply_spec_phase: ApplySpecPhase,
96 0 : client: F,
97 0 : ) -> Result<()>
98 0 : where
99 0 : F: FnOnce() -> Fut,
100 0 : Fut: Future<Output = Result<&'a Client>>,
101 0 : {
102 0 : debug!("Starting phase {:?}", &apply_spec_phase);
103 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
104 0 : let span2 = span.clone();
105 0 : async move {
106 0 : debug!("Processing phase {:?}", &apply_spec_phase);
107 0 : let ctx = ctx;
108 :
109 0 : let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
110 0 : .await?
111 0 : .peekable();
112 0 :
113 0 : // Return (and by doing so, skip requesting the PostgreSQL client) if
114 0 : // we don't have any operations scheduled.
115 0 : if ops.peek().is_none() {
116 0 : return Ok(());
117 0 : }
118 :
119 0 : let client = client().await?;
120 :
121 0 : debug!("Applying phase {:?}", &apply_spec_phase);
122 :
123 0 : let active_queries = ops
124 0 : .map(|op| {
125 0 : let Operation { comment, query } = op;
126 0 : let inspan = match comment {
127 0 : None => span.clone(),
128 0 : Some(comment) => info_span!("phase {}: {}", comment),
129 : };
130 :
131 0 : async {
132 0 : let query = query;
133 0 : let res = client.simple_query(&query).await;
134 0 : debug!(
135 0 : "{} {}",
136 0 : if res.is_ok() {
137 0 : "successfully executed"
138 : } else {
139 0 : "failed to execute"
140 : },
141 : query
142 : );
143 0 : res
144 0 : }
145 0 : .instrument(inspan)
146 0 : })
147 0 : .collect::<Vec<_>>();
148 0 :
149 0 : drop(ctx);
150 :
151 0 : for it in join_all(active_queries).await {
152 0 : drop(it?);
153 : }
154 :
155 0 : debug!("Completed phase {:?}", &apply_spec_phase);
156 :
157 0 : Ok(())
158 0 : }
159 0 : .instrument(span2)
160 0 : .await
161 0 : }
162 :
163 : /// Create a stream of operations to be executed for that phase of applying
164 : /// changes.
165 : ///
166 : /// In the future we may generate a single stream of changes and then
167 : /// sort/merge/batch execution, but for now this is a nice way to improve
168 : /// batching behaviour of the commands.
169 0 : async fn get_operations<'a>(
170 0 : spec: &'a ComputeSpec,
171 0 : ctx: &'a RwLock<MutableApplyContext>,
172 0 : jwks_roles: &'a HashSet<String>,
173 0 : apply_spec_phase: &'a ApplySpecPhase,
174 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
175 0 : match apply_spec_phase {
176 : ApplySpecPhase::CreateSuperUser => {
177 0 : let query = construct_superuser_query(spec);
178 0 :
179 0 : Ok(Box::new(once(Operation {
180 0 : query,
181 0 : comment: None,
182 0 : })))
183 : }
184 : ApplySpecPhase::DropInvalidDatabases => {
185 0 : let mut ctx = ctx.write().await;
186 0 : let databases = &mut ctx.dbs;
187 0 :
188 0 : let keys: Vec<_> = databases
189 0 : .iter()
190 0 : .filter(|(_, db)| db.invalid)
191 0 : .map(|(dbname, _)| dbname.clone())
192 0 : .collect();
193 0 :
194 0 : // After recent commit in Postgres, interrupted DROP DATABASE
195 0 : // leaves the database in the invalid state. According to the
196 0 : // commit message, the only option for user is to drop it again.
197 0 : // See:
198 0 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
199 0 : //
200 0 : // Postgres Neon extension is done the way, that db is de-registered
201 0 : // in the control plane metadata only after it is dropped. So there is
202 0 : // a chance that it still thinks that the db should exist. This means
203 0 : // that it will be re-created by the `CreateDatabases` phase. This
204 0 : // is fine, as user can just drop the table again (in vanilla
205 0 : // Postgres they would need to do the same).
206 0 : let operations = keys
207 0 : .into_iter()
208 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
209 0 : .map(|db| Operation {
210 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
211 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
212 0 : });
213 0 :
214 0 : Ok(Box::new(operations))
215 : }
216 : ApplySpecPhase::RenameRoles => {
217 0 : let mut ctx = ctx.write().await;
218 :
219 0 : let operations = spec
220 0 : .delta_operations
221 0 : .iter()
222 0 : .flatten()
223 0 : .filter(|op| op.action == "rename_role")
224 0 : .filter_map(move |op| {
225 0 : let roles = &mut ctx.roles;
226 0 :
227 0 : if roles.contains_key(op.name.as_str()) {
228 0 : None
229 : } else {
230 0 : let new_name = op.new_name.as_ref().unwrap();
231 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
232 0 :
233 0 : role.name = new_name.clone();
234 0 : role.encrypted_password = None;
235 0 : roles.insert(role.name.clone(), role);
236 0 :
237 0 : Some(Operation {
238 0 : query: format!(
239 0 : "ALTER ROLE {} RENAME TO {}",
240 0 : op.name.pg_quote(),
241 0 : new_name.pg_quote()
242 0 : ),
243 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
244 0 : })
245 : }
246 0 : });
247 0 :
248 0 : Ok(Box::new(operations))
249 : }
250 : ApplySpecPhase::CreateAndAlterRoles => {
251 0 : let mut ctx = ctx.write().await;
252 :
253 0 : let operations = spec.cluster.roles
254 0 : .iter()
255 0 : .filter_map(move |role| {
256 0 : let roles = &mut ctx.roles;
257 0 : let db_role = roles.get(&role.name);
258 0 :
259 0 : match db_role {
260 0 : Some(db_role) => {
261 0 : if db_role.encrypted_password != role.encrypted_password {
262 : // This can be run on /every/ role! Not just ones created through the console.
263 : // This means that if you add some funny ALTER here that adds a permission,
264 : // this will get run even on user-created roles! This will result in different
265 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
266 : // now only grants LOGIN and changes the password. Please do not allow this branch
267 : // to do anything silly.
268 0 : Some(Operation {
269 0 : query: format!(
270 0 : "ALTER ROLE {} {}",
271 0 : role.name.pg_quote(),
272 0 : role.to_pg_options(),
273 0 : ),
274 0 : comment: None,
275 0 : })
276 : } else {
277 0 : None
278 : }
279 : }
280 : None => {
281 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
282 0 : format!(
283 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
284 0 : role.name.pg_quote(),
285 0 : role.to_pg_options(),
286 0 : )
287 : } else {
288 0 : format!(
289 0 : "CREATE ROLE {} {}",
290 0 : role.name.pg_quote(),
291 0 : role.to_pg_options(),
292 0 : )
293 : };
294 0 : Some(Operation {
295 0 : query,
296 0 : comment: Some(format!("creating role {}", role.name)),
297 0 : })
298 : }
299 : }
300 0 : });
301 0 :
302 0 : Ok(Box::new(operations))
303 : }
304 : ApplySpecPhase::RenameAndDeleteDatabases => {
305 0 : let mut ctx = ctx.write().await;
306 :
307 0 : let operations = spec
308 0 : .delta_operations
309 0 : .iter()
310 0 : .flatten()
311 0 : .filter_map(move |op| {
312 0 : let databases = &mut ctx.dbs;
313 0 : match op.action.as_str() {
314 0 : // We do not check whether the DB exists or not,
315 0 : // Postgres will take care of it for us
316 0 : "delete_db" => {
317 : // In Postgres we can't drop a database if it is a template.
318 : // So we need to unset the template flag first, but it could
319 : // be a retry, so we could've already dropped the database.
320 : // Check that database exists first to make it idempotent.
321 0 : let unset_template_query: String = format!(
322 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
323 0 : datname_str = escape_literal(&op.name),
324 0 : datname = &op.name.pg_quote()
325 0 : );
326 0 :
327 0 : // Use FORCE to drop database even if there are active connections.
328 0 : // We run this from `cloud_admin`, so it should have enough privileges.
329 0 : // NB: there could be other db states, which prevent us from dropping
330 0 : // the database. For example, if db is used by any active subscription
331 0 : // or replication slot.
332 0 : // TODO: deal with it once we allow logical replication. Proper fix should
333 0 : // involve returning an error code to the control plane, so it could
334 0 : // figure out that this is a non-retryable error, return it to the user
335 0 : // and fail operation permanently.
336 0 : let drop_db_query: String = format!(
337 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
338 0 : &op.name.pg_quote()
339 0 : );
340 0 :
341 0 : databases.remove(&op.name);
342 0 :
343 0 : Some(vec![
344 0 : Operation {
345 0 : query: unset_template_query,
346 0 : comment: Some(format!(
347 0 : "optionally clearing template flags for DB {}",
348 0 : op.name,
349 0 : )),
350 0 : },
351 0 : Operation {
352 0 : query: drop_db_query,
353 0 : comment: Some(format!("deleting database {}", op.name,)),
354 0 : },
355 0 : ])
356 : }
357 0 : "rename_db" => {
358 0 : if let Some(mut db) = databases.remove(&op.name) {
359 : // update state of known databases
360 0 : let new_name = op.new_name.as_ref().unwrap();
361 0 : db.name = new_name.clone();
362 0 : databases.insert(db.name.clone(), db);
363 0 :
364 0 : Some(vec![Operation {
365 0 : query: format!(
366 0 : "ALTER DATABASE {} RENAME TO {}",
367 0 : op.name.pg_quote(),
368 0 : new_name.pg_quote(),
369 0 : ),
370 0 : comment: Some(format!(
371 0 : "renaming database '{}' to '{}'",
372 0 : op.name, new_name
373 0 : )),
374 0 : }])
375 : } else {
376 0 : None
377 : }
378 : }
379 0 : _ => None,
380 : }
381 0 : })
382 0 : .flatten();
383 0 :
384 0 : Ok(Box::new(operations))
385 : }
386 : ApplySpecPhase::CreateAndAlterDatabases => {
387 0 : let mut ctx = ctx.write().await;
388 :
389 0 : let operations = spec
390 0 : .cluster
391 0 : .databases
392 0 : .iter()
393 0 : .filter_map(move |db| {
394 0 : let databases = &mut ctx.dbs;
395 0 : if let Some(edb) = databases.get_mut(&db.name) {
396 0 : let change_owner = if edb.owner.starts_with('"') {
397 0 : db.owner.pg_quote() != edb.owner
398 : } else {
399 0 : db.owner != edb.owner
400 : };
401 :
402 0 : edb.owner = db.owner.clone();
403 0 :
404 0 : if change_owner {
405 0 : Some(vec![Operation {
406 0 : query: format!(
407 0 : "ALTER DATABASE {} OWNER TO {}",
408 0 : db.name.pg_quote(),
409 0 : db.owner.pg_quote()
410 0 : ),
411 0 : comment: Some(format!(
412 0 : "changing database owner of database {} to {}",
413 0 : db.name, db.owner
414 0 : )),
415 0 : }])
416 : } else {
417 0 : None
418 : }
419 : } else {
420 0 : databases.insert(db.name.clone(), db.clone());
421 0 :
422 0 : Some(vec![
423 0 : Operation {
424 0 : query: format!(
425 0 : "CREATE DATABASE {} {}",
426 0 : db.name.pg_quote(),
427 0 : db.to_pg_options(),
428 0 : ),
429 0 : comment: None,
430 0 : },
431 0 : Operation {
432 0 : query: format!(
433 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
434 0 : db.name.pg_quote()
435 0 : ),
436 0 : comment: None,
437 0 : },
438 0 : ])
439 : }
440 0 : })
441 0 : .flatten();
442 0 :
443 0 : Ok(Box::new(operations))
444 : }
445 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
446 0 : match subphase {
447 : PerDatabasePhase::DeleteDBRoleReferences => {
448 0 : let ctx = ctx.read().await;
449 :
450 0 : let operations =
451 0 : spec.delta_operations
452 0 : .iter()
453 0 : .flatten()
454 0 : .filter(|op| op.action == "delete_role")
455 0 : .filter_map(move |op| {
456 0 : if db.is_owned_by(&op.name) {
457 0 : return None;
458 0 : }
459 0 : if !ctx.roles.contains_key(&op.name) {
460 0 : return None;
461 0 : }
462 0 : let quoted = op.name.pg_quote();
463 0 : let new_owner = match &db {
464 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
465 0 : DB::UserDB(db) => db.owner.pg_quote(),
466 : };
467 :
468 0 : Some(vec![
469 0 : // This will reassign all dependent objects to the db owner
470 0 : Operation {
471 0 : query: format!(
472 0 : "REASSIGN OWNED BY {} TO {}",
473 0 : quoted, new_owner,
474 0 : ),
475 0 : comment: None,
476 0 : },
477 0 : // This now will only drop privileges of the role
478 0 : Operation {
479 0 : query: format!("DROP OWNED BY {}", quoted),
480 0 : comment: None,
481 0 : },
482 0 : ])
483 0 : })
484 0 : .flatten();
485 0 :
486 0 : Ok(Box::new(operations))
487 : }
488 : PerDatabasePhase::ChangeSchemaPerms => {
489 0 : let ctx = ctx.read().await;
490 0 : let databases = &ctx.dbs;
491 :
492 0 : let db = match &db {
493 : // ignore schema permissions on the system database
494 0 : DB::SystemDB => return Ok(Box::new(empty())),
495 0 : DB::UserDB(db) => db,
496 0 : };
497 0 :
498 0 : if databases.get(&db.name).is_none() {
499 0 : bail!("database {} doesn't exist in PostgreSQL", db.name);
500 0 : }
501 0 :
502 0 : let edb = databases.get(&db.name).unwrap();
503 0 :
504 0 : if edb.restrict_conn || edb.invalid {
505 0 : return Ok(Box::new(empty()));
506 0 : }
507 0 :
508 0 : let operations = vec![
509 0 : Operation {
510 0 : query: format!(
511 0 : include_str!("sql/set_public_schema_owner.sql"),
512 0 : db_owner = db.owner.pg_quote()
513 0 : ),
514 0 : comment: None,
515 0 : },
516 0 : Operation {
517 0 : query: String::from(include_str!("sql/default_grants.sql")),
518 0 : comment: None,
519 0 : },
520 0 : ]
521 0 : .into_iter();
522 0 :
523 0 : Ok(Box::new(operations))
524 : }
525 : PerDatabasePhase::HandleAnonExtension => {
526 : // Only install Anon into user databases
527 0 : let db = match &db {
528 0 : DB::SystemDB => return Ok(Box::new(empty())),
529 0 : DB::UserDB(db) => db,
530 0 : };
531 0 : // Never install Anon when it's not enabled as feature
532 0 : if !spec.features.contains(&ComputeFeature::AnonExtension) {
533 0 : return Ok(Box::new(empty()));
534 0 : }
535 0 :
536 0 : // Only install Anon when it's added in preload libraries
537 0 : let opt_libs = spec.cluster.settings.find("shared_preload_libraries");
538 :
539 0 : let libs = match opt_libs {
540 0 : Some(libs) => libs,
541 0 : None => return Ok(Box::new(empty())),
542 : };
543 :
544 0 : if !libs.contains("anon") {
545 0 : return Ok(Box::new(empty()));
546 0 : }
547 0 :
548 0 : let db_owner = db.owner.pg_quote();
549 0 :
550 0 : let operations = vec![
551 0 : // Create anon extension if this compute needs it
552 0 : // Users cannot create it themselves, because superuser is required.
553 0 : Operation {
554 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"),
555 0 : comment: Some(String::from("creating anon extension")),
556 0 : },
557 0 : // Initialize anon extension
558 0 : // This also requires superuser privileges, so users cannot do it themselves.
559 0 : Operation {
560 0 : query: String::from("SELECT anon.init()"),
561 0 : comment: Some(String::from("initializing anon extension data")),
562 0 : },
563 0 : Operation {
564 0 : query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner),
565 0 : comment: Some(String::from(
566 0 : "granting anon extension schema permissions",
567 0 : )),
568 0 : },
569 0 : Operation {
570 0 : query: format!(
571 0 : "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}",
572 0 : db_owner
573 0 : ),
574 0 : comment: Some(String::from(
575 0 : "granting anon extension schema functions permissions",
576 0 : )),
577 0 : },
578 0 : // We need this, because some functions are defined as SECURITY DEFINER.
579 0 : // In Postgres SECURITY DEFINER functions are executed with the privileges
580 0 : // of the owner.
581 0 : // In anon extension this it is needed to access some GUCs, which are only accessible to
582 0 : // superuser. But we've patched postgres to allow db_owner to access them as well.
583 0 : // So we need to change owner of these functions to db_owner.
584 0 : Operation {
585 0 : query: format!(
586 0 : include_str!("sql/anon_ext_fn_reassign.sql"),
587 0 : db_owner = db_owner,
588 0 : ),
589 0 : comment: Some(String::from(
590 0 : "change anon extension functions owner to database_owner",
591 0 : )),
592 0 : },
593 0 : Operation {
594 0 : query: format!(
595 0 : "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}",
596 0 : db_owner,
597 0 : ),
598 0 : comment: Some(String::from(
599 0 : "granting anon extension tables permissions",
600 0 : )),
601 0 : },
602 0 : Operation {
603 0 : query: format!(
604 0 : "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}",
605 0 : db_owner,
606 0 : ),
607 0 : comment: Some(String::from(
608 0 : "granting anon extension sequences permissions",
609 0 : )),
610 0 : },
611 0 : ]
612 0 : .into_iter();
613 0 :
614 0 : Ok(Box::new(operations))
615 : }
616 : }
617 : }
618 : // Interestingly, we only install p_s_s in the main database, even when
619 : // it's preloaded.
620 : ApplySpecPhase::HandleOtherExtensions => {
621 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
622 0 : if libs.contains("pg_stat_statements") {
623 0 : return Ok(Box::new(once(Operation {
624 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
625 0 : comment: Some(String::from("create system extensions")),
626 0 : })));
627 0 : }
628 0 : }
629 0 : Ok(Box::new(empty()))
630 : }
631 : ApplySpecPhase::HandleNeonExtension => {
632 0 : let operations = vec![
633 0 : Operation {
634 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
635 0 : comment: Some(String::from("init: add schema for extension")),
636 0 : },
637 0 : Operation {
638 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
639 0 : comment: Some(String::from(
640 0 : "init: install the extension if not already installed",
641 0 : )),
642 0 : },
643 0 : Operation {
644 0 : query: String::from(
645 0 : "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
646 0 : ),
647 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
648 0 : },
649 0 : Operation {
650 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
651 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
652 0 : },
653 0 : Operation {
654 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
655 0 : comment: Some(String::from("compat/update: update neon extension version")),
656 0 : },
657 0 : ]
658 0 : .into_iter();
659 0 :
660 0 : Ok(Box::new(operations))
661 : }
662 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
663 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
664 0 : comment: None,
665 0 : }))),
666 : ApplySpecPhase::DropRoles => {
667 0 : let operations = spec
668 0 : .delta_operations
669 0 : .iter()
670 0 : .flatten()
671 0 : .filter(|op| op.action == "delete_role")
672 0 : .map(|op| Operation {
673 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
674 0 : comment: None,
675 0 : });
676 0 :
677 0 : Ok(Box::new(operations))
678 : }
679 : }
680 0 : }
|