Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::{Debug, Formatter};
3 : use std::future::Future;
4 : use std::iter::{empty, once};
5 : use std::sync::Arc;
6 :
7 : use anyhow::{Context, Result};
8 : use compute_api::responses::ComputeStatus;
9 : use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
10 : use futures::future::join_all;
11 : use tokio::sync::RwLock;
12 : use tokio_postgres::Client;
13 : use tokio_postgres::error::SqlState;
14 : use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
15 :
16 : use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
17 : use crate::pg_helpers::{
18 : DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
19 : get_existing_roles_async,
20 : };
21 : use crate::spec_apply::ApplySpecPhase::{
22 : CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
23 : CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
24 : HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
25 : RunInEachDatabase,
26 : };
27 : use crate::spec_apply::PerDatabasePhase::{
28 : ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
29 : };
30 :
31 : impl ComputeNode {
32 : /// Apply the spec to the running PostgreSQL instance.
33 : /// The caller can decide to run with multiple clients in parallel, or
34 : /// single mode. Either way, the commands executed will be the same, and
35 : /// only commands run in different databases are parallelized.
36 : #[instrument(skip_all)]
37 : pub fn apply_spec_sql(
38 : &self,
39 : spec: Arc<ComputeSpec>,
40 : conf: Arc<tokio_postgres::Config>,
41 : concurrency: usize,
42 : ) -> Result<()> {
43 : info!("Applying config with max {} concurrency", concurrency);
44 : debug!("Config: {:?}", spec);
45 :
46 : let rt = tokio::runtime::Handle::current();
47 0 : rt.block_on(async {
48 : // Proceed with post-startup configuration. Note, that order of operations is important.
49 0 : let client = Self::get_maintenance_client(&conf).await?;
50 0 : let spec = spec.clone();
51 :
52 0 : let databases = get_existing_dbs_async(&client).await?;
53 0 : let roles = get_existing_roles_async(&client)
54 0 : .await?
55 0 : .into_iter()
56 0 : .map(|role| (role.name.clone(), role))
57 0 : .collect::<HashMap<String, Role>>();
58 0 :
59 0 : // Check if we need to drop subscriptions before starting the endpoint.
60 0 : //
61 0 : // It is important to do this operation exactly once when endpoint starts on a new branch.
62 0 : // Otherwise, we may drop not inherited, but newly created subscriptions.
63 0 : //
64 0 : // We cannot rely only on spec.drop_subscriptions_before_start flag,
65 0 : // because if for some reason compute restarts inside VM,
66 0 : // it will start again with the same spec and flag value.
67 0 : //
68 0 : // To handle this, we save the fact of the operation in the database
69 0 : // in the neon.drop_subscriptions_done table.
70 0 : // If the table does not exist, we assume that the operation was never performed, so we must do it.
71 0 : // If table exists, we check if the operation was performed on the current timelilne.
72 0 : //
73 0 : let mut drop_subscriptions_done = false;
74 0 :
75 0 : if spec.drop_subscriptions_before_start {
76 0 : let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
77 0 : let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
78 0 :
79 0 : info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
80 :
81 : drop_subscriptions_done = match
82 0 : client.simple_query(&query).await {
83 0 : Ok(result) => {
84 0 : matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
85 : },
86 0 : Err(e) =>
87 0 : {
88 0 : match e.code() {
89 0 : Some(&SqlState::UNDEFINED_TABLE) => false,
90 : _ => {
91 : // We don't expect any other error here, except for the schema/table not existing
92 0 : error!("Error checking if drop subscription operation was already performed: {}", e);
93 0 : return Err(e.into());
94 : }
95 : }
96 : }
97 : }
98 0 : };
99 :
100 :
101 0 : let jwks_roles = Arc::new(
102 0 : spec.as_ref()
103 0 : .local_proxy_config
104 0 : .iter()
105 0 : .flat_map(|it| &it.jwks)
106 0 : .flatten()
107 0 : .flat_map(|setting| &setting.role_names)
108 0 : .cloned()
109 0 : .collect::<HashSet<_>>(),
110 0 : );
111 0 :
112 0 : let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
113 0 : roles,
114 0 : dbs: databases,
115 0 : }));
116 0 :
117 0 : // Apply special pre drop database phase.
118 0 : // NOTE: we use the code of RunInEachDatabase phase for parallelism
119 0 : // and connection management, but we don't really run it in *each* database,
120 0 : // only in databases, we're about to drop.
121 0 : info!("Applying PerDatabase (pre-dropdb) phase");
122 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
123 0 :
124 0 : // Run the phase for each database that we're about to drop.
125 0 : let db_processes = spec
126 0 : .delta_operations
127 0 : .iter()
128 0 : .flatten()
129 0 : .filter_map(move |op| {
130 0 : if op.action.as_str() == "delete_db" {
131 0 : Some(op.name.clone())
132 : } else {
133 0 : None
134 : }
135 0 : })
136 0 : .map(|dbname| {
137 0 : let spec = spec.clone();
138 0 : let ctx = ctx.clone();
139 0 : let jwks_roles = jwks_roles.clone();
140 0 : let mut conf = conf.as_ref().clone();
141 0 : let concurrency_token = concurrency_token.clone();
142 0 : // We only need dbname field for this phase, so set other fields to dummy values
143 0 : let db = DB::UserDB(Database {
144 0 : name: dbname.clone(),
145 0 : owner: "cloud_admin".to_string(),
146 0 : options: None,
147 0 : restrict_conn: false,
148 0 : invalid: false,
149 0 : });
150 0 :
151 0 : debug!("Applying per-database phases for Database {:?}", &db);
152 :
153 0 : match &db {
154 0 : DB::SystemDB => {}
155 0 : DB::UserDB(db) => {
156 0 : conf.dbname(db.name.as_str());
157 0 : }
158 : }
159 :
160 0 : let conf = Arc::new(conf);
161 0 : let fut = Self::apply_spec_sql_db(
162 0 : spec.clone(),
163 0 : conf,
164 0 : ctx.clone(),
165 0 : jwks_roles.clone(),
166 0 : concurrency_token.clone(),
167 0 : db,
168 0 : [DropLogicalSubscriptions].to_vec(),
169 0 : );
170 0 :
171 0 : Ok(tokio::spawn(fut))
172 0 : })
173 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
174 :
175 0 : for process in db_processes.into_iter() {
176 0 : let handle = process?;
177 0 : if let Err(e) = handle.await? {
178 : // Handle the error case where the database does not exist
179 : // We do not check whether the DB exists or not in the deletion phase,
180 : // so we shouldn't be strict about it in pre-deletion cleanup as well.
181 0 : if e.to_string().contains("does not exist") {
182 0 : warn!("Error dropping subscription: {}", e);
183 : } else {
184 0 : return Err(e);
185 : }
186 0 : };
187 : }
188 :
189 0 : for phase in [
190 0 : CreateSuperUser,
191 0 : DropInvalidDatabases,
192 0 : RenameRoles,
193 0 : CreateAndAlterRoles,
194 0 : RenameAndDeleteDatabases,
195 0 : CreateAndAlterDatabases,
196 0 : CreateSchemaNeon,
197 : ] {
198 0 : info!("Applying phase {:?}", &phase);
199 0 : apply_operations(
200 0 : spec.clone(),
201 0 : ctx.clone(),
202 0 : jwks_roles.clone(),
203 0 : phase,
204 0 : || async { Ok(&client) },
205 0 : )
206 0 : .await?;
207 : }
208 :
209 0 : info!("Applying RunInEachDatabase2 phase");
210 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
211 0 :
212 0 : let db_processes = spec
213 0 : .cluster
214 0 : .databases
215 0 : .iter()
216 0 : .map(|db| DB::new(db.clone()))
217 0 : // include
218 0 : .chain(once(DB::SystemDB))
219 0 : .map(|db| {
220 0 : let spec = spec.clone();
221 0 : let ctx = ctx.clone();
222 0 : let jwks_roles = jwks_roles.clone();
223 0 : let mut conf = conf.as_ref().clone();
224 0 : let concurrency_token = concurrency_token.clone();
225 0 : let db = db.clone();
226 0 :
227 0 : debug!("Applying per-database phases for Database {:?}", &db);
228 :
229 0 : match &db {
230 0 : DB::SystemDB => {}
231 0 : DB::UserDB(db) => {
232 0 : conf.dbname(db.name.as_str());
233 0 : }
234 : }
235 :
236 0 : let conf = Arc::new(conf);
237 0 : let mut phases = vec![
238 0 : DeleteDBRoleReferences,
239 0 : ChangeSchemaPerms,
240 0 : HandleAnonExtension,
241 0 : ];
242 0 :
243 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
244 0 : info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
245 0 : phases.push(DropLogicalSubscriptions);
246 0 : }
247 :
248 0 : let fut = Self::apply_spec_sql_db(
249 0 : spec.clone(),
250 0 : conf,
251 0 : ctx.clone(),
252 0 : jwks_roles.clone(),
253 0 : concurrency_token.clone(),
254 0 : db,
255 0 : phases,
256 0 : );
257 0 :
258 0 : Ok(tokio::spawn(fut))
259 0 : })
260 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
261 :
262 0 : for process in db_processes.into_iter() {
263 0 : let handle = process?;
264 0 : handle.await??;
265 : }
266 :
267 0 : let mut phases = vec![
268 0 : HandleOtherExtensions,
269 0 : HandleNeonExtension, // This step depends on CreateSchemaNeon
270 0 : CreateAvailabilityCheck,
271 0 : DropRoles,
272 0 : ];
273 0 :
274 0 : // This step depends on CreateSchemaNeon
275 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
276 0 : info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
277 0 : phases.push(FinalizeDropLogicalSubscriptions);
278 0 : }
279 :
280 0 : for phase in phases {
281 0 : debug!("Applying phase {:?}", &phase);
282 0 : apply_operations(
283 0 : spec.clone(),
284 0 : ctx.clone(),
285 0 : jwks_roles.clone(),
286 0 : phase,
287 0 : || async { Ok(&client) },
288 0 : )
289 0 : .await?;
290 : }
291 :
292 0 : Ok::<(), anyhow::Error>(())
293 0 : })?;
294 :
295 : Ok(())
296 : }
297 :
298 : /// Apply SQL migrations of the RunInEachDatabase phase.
299 : ///
300 : /// May opt to not connect to databases that don't have any scheduled
301 : /// operations. The function is concurrency-controlled with the provided
302 : /// semaphore. The caller has to make sure the semaphore isn't exhausted.
303 0 : async fn apply_spec_sql_db(
304 0 : spec: Arc<ComputeSpec>,
305 0 : conf: Arc<tokio_postgres::Config>,
306 0 : ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
307 0 : jwks_roles: Arc<HashSet<String>>,
308 0 : concurrency_token: Arc<tokio::sync::Semaphore>,
309 0 : db: DB,
310 0 : subphases: Vec<PerDatabasePhase>,
311 0 : ) -> Result<()> {
312 0 : let _permit = concurrency_token.acquire().await?;
313 :
314 0 : let mut client_conn = None;
315 :
316 0 : for subphase in subphases {
317 0 : apply_operations(
318 0 : spec.clone(),
319 0 : ctx.clone(),
320 0 : jwks_roles.clone(),
321 0 : RunInEachDatabase {
322 0 : db: db.clone(),
323 0 : subphase,
324 0 : },
325 0 : // Only connect if apply_operation actually wants a connection.
326 0 : // It's quite possible this database doesn't need any queries,
327 0 : // so by not connecting we save time and effort connecting to
328 0 : // that database.
329 0 : || async {
330 0 : if client_conn.is_none() {
331 0 : let db_client = Self::get_maintenance_client(&conf).await?;
332 0 : client_conn.replace(db_client);
333 0 : }
334 0 : let client = client_conn.as_ref().unwrap();
335 0 : Ok(client)
336 0 : },
337 0 : )
338 0 : .await?;
339 : }
340 :
341 0 : drop(client_conn);
342 0 :
343 0 : Ok::<(), anyhow::Error>(())
344 0 : }
345 :
346 : /// Choose how many concurrent connections to use for applying the spec changes.
347 0 : pub fn max_service_connections(
348 0 : &self,
349 0 : compute_state: &ComputeState,
350 0 : spec: &ComputeSpec,
351 0 : ) -> usize {
352 0 : // If the cluster is in Init state we don't have to deal with user connections,
353 0 : // and can thus use all `max_connections` connection slots. However, that's generally not
354 0 : // very efficient, so we generally still limit it to a smaller number.
355 0 : if compute_state.status == ComputeStatus::Init {
356 : // If the settings contain 'max_connections', use that as template
357 0 : if let Some(config) = spec.cluster.settings.find("max_connections") {
358 0 : config.parse::<usize>().ok()
359 : } else {
360 : // Otherwise, try to find the setting in the postgresql_conf string
361 0 : spec.cluster
362 0 : .postgresql_conf
363 0 : .iter()
364 0 : .flat_map(|conf| conf.split("\n"))
365 0 : .filter_map(|line| {
366 0 : if !line.contains("max_connections") {
367 0 : return None;
368 0 : }
369 :
370 0 : let (key, value) = line.split_once("=")?;
371 0 : let key = key
372 0 : .trim_start_matches(char::is_whitespace)
373 0 : .trim_end_matches(char::is_whitespace);
374 0 :
375 0 : let value = value
376 0 : .trim_start_matches(char::is_whitespace)
377 0 : .trim_end_matches(char::is_whitespace);
378 0 :
379 0 : if key != "max_connections" {
380 0 : return None;
381 0 : }
382 0 :
383 0 : value.parse::<usize>().ok()
384 0 : })
385 0 : .next()
386 : }
387 : // If max_connections is present, use at most 1/3rd of that.
388 : // When max_connections is lower than 30, try to use at least 10 connections, but
389 : // never more than max_connections.
390 0 : .map(|limit| match limit {
391 0 : 0..10 => limit,
392 0 : 10..30 => 10,
393 0 : 30.. => limit / 3,
394 0 : })
395 0 : // If we didn't find max_connections, default to 10 concurrent connections.
396 0 : .unwrap_or(10)
397 : } else {
398 : // state == Running
399 : // Because the cluster is already in the Running state, we should assume users are
400 : // already connected to the cluster, and high concurrency could negatively
401 : // impact user connectivity. Therefore, we can limit concurrency to the number of
402 : // reserved superuser connections, which users wouldn't be able to use anyway.
403 0 : spec.cluster
404 0 : .settings
405 0 : .find("superuser_reserved_connections")
406 0 : .iter()
407 0 : .filter_map(|val| val.parse::<usize>().ok())
408 0 : .map(|val| if val > 1 { val - 1 } else { 1 })
409 0 : .last()
410 0 : .unwrap_or(3)
411 : }
412 0 : }
413 : }
414 :
415 : #[derive(Clone)]
416 : pub enum DB {
417 : SystemDB,
418 : UserDB(Database),
419 : }
420 :
421 : impl DB {
422 0 : pub fn new(db: Database) -> DB {
423 0 : Self::UserDB(db)
424 0 : }
425 :
426 0 : pub fn is_owned_by(&self, role: &PgIdent) -> bool {
427 0 : match self {
428 0 : DB::SystemDB => false,
429 0 : DB::UserDB(db) => &db.owner == role,
430 : }
431 0 : }
432 : }
433 :
434 : impl Debug for DB {
435 0 : fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
436 0 : match self {
437 0 : DB::SystemDB => f.debug_tuple("SystemDB").finish(),
438 0 : DB::UserDB(db) => f.debug_tuple("UserDB").field(&db.name).finish(),
439 : }
440 0 : }
441 : }
442 :
443 : #[derive(Copy, Clone, Debug)]
444 : pub enum PerDatabasePhase {
445 : DeleteDBRoleReferences,
446 : ChangeSchemaPerms,
447 : HandleAnonExtension,
448 : /// This is a shared phase, used for both i) dropping dangling LR subscriptions
449 : /// before dropping the DB, and ii) dropping all subscriptions after creating
450 : /// a fresh branch.
451 : /// N.B. we will skip all DBs that are not present in Postgres, invalid, or
452 : /// have `datallowconn = false` (`restrict_conn`).
453 : DropLogicalSubscriptions,
454 : }
455 :
456 : #[derive(Clone, Debug)]
457 : pub enum ApplySpecPhase {
458 : CreateSuperUser,
459 : DropInvalidDatabases,
460 : RenameRoles,
461 : CreateAndAlterRoles,
462 : RenameAndDeleteDatabases,
463 : CreateAndAlterDatabases,
464 : CreateSchemaNeon,
465 : RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
466 : HandleOtherExtensions,
467 : HandleNeonExtension,
468 : CreateAvailabilityCheck,
469 : DropRoles,
470 : FinalizeDropLogicalSubscriptions,
471 : }
472 :
473 : pub struct Operation {
474 : pub query: String,
475 : pub comment: Option<String>,
476 : }
477 :
478 : pub struct MutableApplyContext {
479 : pub roles: HashMap<String, Role>,
480 : pub dbs: HashMap<String, Database>,
481 : }
482 :
483 : /// Apply the operations that belong to the given spec apply phase.
484 : ///
485 : /// Commands within a single phase are executed in order of Iterator yield.
486 : /// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
487 : /// indicated by its `db` field, and can share a single client for all changes
488 : /// to that database.
489 : ///
490 : /// Notes:
491 : /// - Commands are pipelined, and thus may cause incomplete apply if one
492 : /// command of many fails.
493 : /// - Failing commands will fail the phase's apply step once the return value
494 : /// is processed.
495 : /// - No timeouts have (yet) been implemented.
496 : /// - The caller is responsible for limiting and/or applying concurrency.
497 0 : pub async fn apply_operations<'a, Fut, F>(
498 0 : spec: Arc<ComputeSpec>,
499 0 : ctx: Arc<RwLock<MutableApplyContext>>,
500 0 : jwks_roles: Arc<HashSet<String>>,
501 0 : apply_spec_phase: ApplySpecPhase,
502 0 : client: F,
503 0 : ) -> Result<()>
504 0 : where
505 0 : F: FnOnce() -> Fut,
506 0 : Fut: Future<Output = Result<&'a Client>>,
507 0 : {
508 0 : debug!("Starting phase {:?}", &apply_spec_phase);
509 0 : let span = info_span!("db_apply_changes", phase=?apply_spec_phase);
510 0 : let span2 = span.clone();
511 0 : async move {
512 0 : debug!("Processing phase {:?}", &apply_spec_phase);
513 0 : let ctx = ctx;
514 :
515 0 : let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
516 0 : .await?
517 0 : .peekable();
518 0 :
519 0 : // Return (and by doing so, skip requesting the PostgreSQL client) if
520 0 : // we don't have any operations scheduled.
521 0 : if ops.peek().is_none() {
522 0 : return Ok(());
523 0 : }
524 :
525 0 : let client = client().await?;
526 :
527 0 : debug!("Applying phase {:?}", &apply_spec_phase);
528 :
529 0 : let active_queries = ops
530 0 : .map(|op| {
531 0 : let Operation { comment, query } = op;
532 0 : let inspan = match comment {
533 0 : None => span.clone(),
534 0 : Some(comment) => info_span!("phase {}: {}", comment),
535 : };
536 :
537 0 : async {
538 0 : let query = query;
539 0 : let res = client.simple_query(&query).await;
540 0 : debug!(
541 0 : "{} {}",
542 0 : if res.is_ok() {
543 0 : "successfully executed"
544 : } else {
545 0 : "failed to execute"
546 : },
547 : query
548 : );
549 0 : res
550 0 : }
551 0 : .instrument(inspan)
552 0 : })
553 0 : .collect::<Vec<_>>();
554 0 :
555 0 : drop(ctx);
556 :
557 0 : for it in join_all(active_queries).await {
558 0 : drop(it?);
559 : }
560 :
561 0 : debug!("Completed phase {:?}", &apply_spec_phase);
562 :
563 0 : Ok(())
564 0 : }
565 0 : .instrument(span2)
566 0 : .await
567 0 : }
568 :
569 : /// Create a stream of operations to be executed for that phase of applying
570 : /// changes.
571 : ///
572 : /// In the future we may generate a single stream of changes and then
573 : /// sort/merge/batch execution, but for now this is a nice way to improve
574 : /// batching behavior of the commands.
575 0 : async fn get_operations<'a>(
576 0 : spec: &'a ComputeSpec,
577 0 : ctx: &'a RwLock<MutableApplyContext>,
578 0 : jwks_roles: &'a HashSet<String>,
579 0 : apply_spec_phase: &'a ApplySpecPhase,
580 0 : ) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
581 0 : match apply_spec_phase {
582 : ApplySpecPhase::CreateSuperUser => {
583 0 : let query = construct_superuser_query(spec);
584 0 :
585 0 : Ok(Box::new(once(Operation {
586 0 : query,
587 0 : comment: None,
588 0 : })))
589 : }
590 : ApplySpecPhase::DropInvalidDatabases => {
591 0 : let mut ctx = ctx.write().await;
592 0 : let databases = &mut ctx.dbs;
593 0 :
594 0 : let keys: Vec<_> = databases
595 0 : .iter()
596 0 : .filter(|(_, db)| db.invalid)
597 0 : .map(|(dbname, _)| dbname.clone())
598 0 : .collect();
599 0 :
600 0 : // After recent commit in Postgres, interrupted DROP DATABASE
601 0 : // leaves the database in the invalid state. According to the
602 0 : // commit message, the only option for user is to drop it again.
603 0 : // See:
604 0 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
605 0 : //
606 0 : // Postgres Neon extension is done the way, that db is de-registered
607 0 : // in the control plane metadata only after it is dropped. So there is
608 0 : // a chance that it still thinks that the db should exist. This means
609 0 : // that it will be re-created by the `CreateDatabases` phase. This
610 0 : // is fine, as user can just drop the table again (in vanilla
611 0 : // Postgres they would need to do the same).
612 0 : let operations = keys
613 0 : .into_iter()
614 0 : .filter_map(move |dbname| ctx.dbs.remove(&dbname))
615 0 : .map(|db| Operation {
616 0 : query: format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote()),
617 0 : comment: Some(format!("Dropping invalid database {}", db.name)),
618 0 : });
619 0 :
620 0 : Ok(Box::new(operations))
621 : }
622 : ApplySpecPhase::RenameRoles => {
623 0 : let mut ctx = ctx.write().await;
624 :
625 0 : let operations = spec
626 0 : .delta_operations
627 0 : .iter()
628 0 : .flatten()
629 0 : .filter(|op| op.action == "rename_role")
630 0 : .filter_map(move |op| {
631 0 : let roles = &mut ctx.roles;
632 0 :
633 0 : if roles.contains_key(op.name.as_str()) {
634 0 : None
635 : } else {
636 0 : let new_name = op.new_name.as_ref().unwrap();
637 0 : let mut role = roles.remove(op.name.as_str()).unwrap();
638 0 :
639 0 : role.name = new_name.clone();
640 0 : role.encrypted_password = None;
641 0 : roles.insert(role.name.clone(), role);
642 0 :
643 0 : Some(Operation {
644 0 : query: format!(
645 0 : "ALTER ROLE {} RENAME TO {}",
646 0 : op.name.pg_quote(),
647 0 : new_name.pg_quote()
648 0 : ),
649 0 : comment: Some(format!("renaming role '{}' to '{}'", op.name, new_name)),
650 0 : })
651 : }
652 0 : });
653 0 :
654 0 : Ok(Box::new(operations))
655 : }
656 : ApplySpecPhase::CreateAndAlterRoles => {
657 0 : let mut ctx = ctx.write().await;
658 :
659 0 : let operations = spec.cluster.roles
660 0 : .iter()
661 0 : .filter_map(move |role| {
662 0 : let roles = &mut ctx.roles;
663 0 : let db_role = roles.get(&role.name);
664 0 :
665 0 : match db_role {
666 0 : Some(db_role) => {
667 0 : if db_role.encrypted_password != role.encrypted_password {
668 : // This can be run on /every/ role! Not just ones created through the console.
669 : // This means that if you add some funny ALTER here that adds a permission,
670 : // this will get run even on user-created roles! This will result in different
671 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
672 : // now only grants LOGIN and changes the password. Please do not allow this branch
673 : // to do anything silly.
674 0 : Some(Operation {
675 0 : query: format!(
676 0 : "ALTER ROLE {} {}",
677 0 : role.name.pg_quote(),
678 0 : role.to_pg_options(),
679 0 : ),
680 0 : comment: None,
681 0 : })
682 : } else {
683 0 : None
684 : }
685 : }
686 : None => {
687 0 : let query = if !jwks_roles.contains(role.name.as_str()) {
688 0 : format!(
689 0 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
690 0 : role.name.pg_quote(),
691 0 : role.to_pg_options(),
692 0 : )
693 : } else {
694 0 : format!(
695 0 : "CREATE ROLE {} {}",
696 0 : role.name.pg_quote(),
697 0 : role.to_pg_options(),
698 0 : )
699 : };
700 0 : Some(Operation {
701 0 : query,
702 0 : comment: Some(format!("creating role {}", role.name)),
703 0 : })
704 : }
705 : }
706 0 : });
707 0 :
708 0 : Ok(Box::new(operations))
709 : }
710 : ApplySpecPhase::RenameAndDeleteDatabases => {
711 0 : let mut ctx = ctx.write().await;
712 :
713 0 : let operations = spec
714 0 : .delta_operations
715 0 : .iter()
716 0 : .flatten()
717 0 : .filter_map(move |op| {
718 0 : let databases = &mut ctx.dbs;
719 0 : match op.action.as_str() {
720 0 : // We do not check whether the DB exists or not,
721 0 : // Postgres will take care of it for us
722 0 : "delete_db" => {
723 : // In Postgres we can't drop a database if it is a template.
724 : // So we need to unset the template flag first, but it could
725 : // be a retry, so we could've already dropped the database.
726 : // Check that database exists first to make it idempotent.
727 0 : let unset_template_query: String = format!(
728 0 : include_str!("sql/unset_template_for_drop_dbs.sql"),
729 0 : datname_str = escape_literal(&op.name),
730 0 : datname = &op.name.pg_quote()
731 0 : );
732 0 :
733 0 : // Use FORCE to drop database even if there are active connections.
734 0 : // We run this from `cloud_admin`, so it should have enough privileges.
735 0 : //
736 0 : // NB: there could be other db states, which prevent us from dropping
737 0 : // the database. For example, if db is used by any active subscription
738 0 : // or replication slot.
739 0 : // Such cases are handled in the DropLogicalSubscriptions
740 0 : // phase. We do all the cleanup before actually dropping the database.
741 0 : let drop_db_query: String = format!(
742 0 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
743 0 : &op.name.pg_quote()
744 0 : );
745 0 :
746 0 : databases.remove(&op.name);
747 0 :
748 0 : Some(vec![
749 0 : Operation {
750 0 : query: unset_template_query,
751 0 : comment: Some(format!(
752 0 : "optionally clearing template flags for DB {}",
753 0 : op.name,
754 0 : )),
755 0 : },
756 0 : Operation {
757 0 : query: drop_db_query,
758 0 : comment: Some(format!("deleting database {}", op.name,)),
759 0 : },
760 0 : ])
761 : }
762 0 : "rename_db" => {
763 0 : if let Some(mut db) = databases.remove(&op.name) {
764 : // update state of known databases
765 0 : let new_name = op.new_name.as_ref().unwrap();
766 0 : db.name = new_name.clone();
767 0 : databases.insert(db.name.clone(), db);
768 0 :
769 0 : Some(vec![Operation {
770 0 : query: format!(
771 0 : "ALTER DATABASE {} RENAME TO {}",
772 0 : op.name.pg_quote(),
773 0 : new_name.pg_quote(),
774 0 : ),
775 0 : comment: Some(format!(
776 0 : "renaming database '{}' to '{}'",
777 0 : op.name, new_name
778 0 : )),
779 0 : }])
780 : } else {
781 0 : None
782 : }
783 : }
784 0 : _ => None,
785 : }
786 0 : })
787 0 : .flatten();
788 0 :
789 0 : Ok(Box::new(operations))
790 : }
791 : ApplySpecPhase::CreateAndAlterDatabases => {
792 0 : let mut ctx = ctx.write().await;
793 :
794 0 : let operations = spec
795 0 : .cluster
796 0 : .databases
797 0 : .iter()
798 0 : .filter_map(move |db| {
799 0 : let databases = &mut ctx.dbs;
800 0 : if let Some(edb) = databases.get_mut(&db.name) {
801 0 : let change_owner = if edb.owner.starts_with('"') {
802 0 : db.owner.pg_quote() != edb.owner
803 : } else {
804 0 : db.owner != edb.owner
805 : };
806 :
807 0 : edb.owner = db.owner.clone();
808 0 :
809 0 : if change_owner {
810 0 : Some(vec![Operation {
811 0 : query: format!(
812 0 : "ALTER DATABASE {} OWNER TO {}",
813 0 : db.name.pg_quote(),
814 0 : db.owner.pg_quote()
815 0 : ),
816 0 : comment: Some(format!(
817 0 : "changing database owner of database {} to {}",
818 0 : db.name, db.owner
819 0 : )),
820 0 : }])
821 : } else {
822 0 : None
823 : }
824 : } else {
825 0 : databases.insert(db.name.clone(), db.clone());
826 0 :
827 0 : Some(vec![
828 0 : Operation {
829 0 : query: format!(
830 0 : "CREATE DATABASE {} {}",
831 0 : db.name.pg_quote(),
832 0 : db.to_pg_options(),
833 0 : ),
834 0 : comment: None,
835 0 : },
836 0 : Operation {
837 0 : query: format!(
838 0 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
839 0 : db.name.pg_quote()
840 0 : ),
841 0 : comment: None,
842 0 : },
843 0 : ])
844 : }
845 0 : })
846 0 : .flatten();
847 0 :
848 0 : Ok(Box::new(operations))
849 : }
850 0 : ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
851 0 : query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
852 0 : comment: Some(String::from(
853 0 : "create schema for neon extension and utils tables",
854 0 : )),
855 0 : }))),
856 0 : ApplySpecPhase::RunInEachDatabase { db, subphase } => {
857 : // Do some checks that user DB exists and we can access it.
858 : //
859 : // During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
860 : // which happen before dropping the DB, the current run could be a retry,
861 : // so it's a valid case when DB is absent already. The case of
862 : // `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
863 : // in theory user can have some dangling objects there, so we will fail at
864 : // the actual drop later. Yet, to fix that in the current code we would need
865 : // to ALTER DATABASE, and then check back, but that even more invasive, so
866 : // that's not what we really want to do here.
867 : //
868 : // For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
869 0 : if let DB::UserDB(db) = db {
870 0 : let databases = &ctx.read().await.dbs;
871 :
872 0 : let edb = match databases.get(&db.name) {
873 0 : Some(edb) => edb,
874 : None => {
875 0 : warn!(
876 0 : "skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL",
877 : subphase, db.name
878 : );
879 0 : return Ok(Box::new(empty()));
880 : }
881 : };
882 :
883 0 : if edb.restrict_conn || edb.invalid {
884 0 : warn!(
885 0 : "skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
886 : subphase, db.name, edb.restrict_conn, edb.invalid
887 : );
888 0 : return Ok(Box::new(empty()));
889 0 : }
890 0 : }
891 :
892 0 : match subphase {
893 : PerDatabasePhase::DropLogicalSubscriptions => {
894 0 : match &db {
895 0 : DB::UserDB(db) => {
896 0 : let drop_subscription_query: String = format!(
897 0 : include_str!("sql/drop_subscriptions.sql"),
898 0 : datname_str = escape_literal(&db.name),
899 0 : );
900 0 :
901 0 : let operations = vec![Operation {
902 0 : query: drop_subscription_query,
903 0 : comment: Some(format!(
904 0 : "optionally dropping subscriptions for DB {}",
905 0 : db.name,
906 0 : )),
907 0 : }]
908 0 : .into_iter();
909 0 :
910 0 : Ok(Box::new(operations))
911 : }
912 : // skip this cleanup for the system databases
913 : // because users can't drop them
914 0 : DB::SystemDB => Ok(Box::new(empty())),
915 : }
916 : }
917 : PerDatabasePhase::DeleteDBRoleReferences => {
918 0 : let ctx = ctx.read().await;
919 :
920 0 : let operations =
921 0 : spec.delta_operations
922 0 : .iter()
923 0 : .flatten()
924 0 : .filter(|op| op.action == "delete_role")
925 0 : .filter_map(move |op| {
926 0 : if db.is_owned_by(&op.name) {
927 0 : return None;
928 0 : }
929 0 : if !ctx.roles.contains_key(&op.name) {
930 0 : return None;
931 0 : }
932 0 : let quoted = op.name.pg_quote();
933 0 : let new_owner = match &db {
934 0 : DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
935 0 : DB::UserDB(db) => db.owner.pg_quote(),
936 : };
937 :
938 0 : Some(vec![
939 0 : // This will reassign all dependent objects to the db owner
940 0 : Operation {
941 0 : query: format!(
942 0 : "REASSIGN OWNED BY {} TO {}",
943 0 : quoted, new_owner,
944 0 : ),
945 0 : comment: None,
946 0 : },
947 0 : // Revoke some potentially blocking privileges (Neon-specific currently)
948 0 : Operation {
949 0 : query: format!(
950 0 : include_str!("sql/pre_drop_role_revoke_privileges.sql"),
951 0 : role_name = quoted,
952 0 : ),
953 0 : comment: None,
954 0 : },
955 0 : // This now will only drop privileges of the role
956 0 : // TODO: this is obviously not 100% true because of the above case,
957 0 : // there could be still some privileges that are not revoked. Maybe this
958 0 : // only drops privileges that were granted *by this* role, not *to this* role,
959 0 : // but this has to be checked.
960 0 : Operation {
961 0 : query: format!("DROP OWNED BY {}", quoted),
962 0 : comment: None,
963 0 : },
964 0 : ])
965 0 : })
966 0 : .flatten();
967 0 :
968 0 : Ok(Box::new(operations))
969 : }
970 : PerDatabasePhase::ChangeSchemaPerms => {
971 0 : let db = match &db {
972 : // ignore schema permissions on the system database
973 0 : DB::SystemDB => return Ok(Box::new(empty())),
974 0 : DB::UserDB(db) => db,
975 0 : };
976 0 :
977 0 : let operations = vec![
978 0 : Operation {
979 0 : query: format!(
980 0 : include_str!("sql/set_public_schema_owner.sql"),
981 0 : db_owner = db.owner.pg_quote()
982 0 : ),
983 0 : comment: None,
984 0 : },
985 0 : Operation {
986 0 : query: String::from(include_str!("sql/default_grants.sql")),
987 0 : comment: None,
988 0 : },
989 0 : ]
990 0 : .into_iter();
991 0 :
992 0 : Ok(Box::new(operations))
993 : }
994 : // TODO: remove this completely https://github.com/neondatabase/cloud/issues/22663
995 : PerDatabasePhase::HandleAnonExtension => {
996 : // Only install Anon into user databases
997 0 : let db = match &db {
998 0 : DB::SystemDB => return Ok(Box::new(empty())),
999 0 : DB::UserDB(db) => db,
1000 0 : };
1001 0 : // Never install Anon when it's not enabled as feature
1002 0 : if !spec.features.contains(&ComputeFeature::AnonExtension) {
1003 0 : return Ok(Box::new(empty()));
1004 0 : }
1005 0 :
1006 0 : // Only install Anon when it's added in preload libraries
1007 0 : let opt_libs = spec.cluster.settings.find("shared_preload_libraries");
1008 :
1009 0 : let libs = match opt_libs {
1010 0 : Some(libs) => libs,
1011 0 : None => return Ok(Box::new(empty())),
1012 : };
1013 :
1014 0 : if !libs.contains("anon") {
1015 0 : return Ok(Box::new(empty()));
1016 0 : }
1017 0 :
1018 0 : let db_owner = db.owner.pg_quote();
1019 0 :
1020 0 : let operations = vec![
1021 0 : // Create anon extension if this compute needs it
1022 0 : // Users cannot create it themselves, because superuser is required.
1023 0 : Operation {
1024 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS anon CASCADE"),
1025 0 : comment: Some(String::from("creating anon extension")),
1026 0 : },
1027 0 : // Initialize anon extension
1028 0 : // This also requires superuser privileges, so users cannot do it themselves.
1029 0 : Operation {
1030 0 : query: String::from("SELECT anon.init()"),
1031 0 : comment: Some(String::from("initializing anon extension data")),
1032 0 : },
1033 0 : Operation {
1034 0 : query: format!("GRANT ALL ON SCHEMA anon TO {}", db_owner),
1035 0 : comment: Some(String::from(
1036 0 : "granting anon extension schema permissions",
1037 0 : )),
1038 0 : },
1039 0 : Operation {
1040 0 : query: format!(
1041 0 : "GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}",
1042 0 : db_owner
1043 0 : ),
1044 0 : comment: Some(String::from(
1045 0 : "granting anon extension schema functions permissions",
1046 0 : )),
1047 0 : },
1048 0 : // We need this, because some functions are defined as SECURITY DEFINER.
1049 0 : // In Postgres SECURITY DEFINER functions are executed with the privileges
1050 0 : // of the owner.
1051 0 : // In anon extension this it is needed to access some GUCs, which are only accessible to
1052 0 : // superuser. But we've patched postgres to allow db_owner to access them as well.
1053 0 : // So we need to change owner of these functions to db_owner.
1054 0 : Operation {
1055 0 : query: format!(
1056 0 : include_str!("sql/anon_ext_fn_reassign.sql"),
1057 0 : db_owner = db_owner,
1058 0 : ),
1059 0 : comment: Some(String::from(
1060 0 : "change anon extension functions owner to database_owner",
1061 0 : )),
1062 0 : },
1063 0 : Operation {
1064 0 : query: format!(
1065 0 : "GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}",
1066 0 : db_owner,
1067 0 : ),
1068 0 : comment: Some(String::from(
1069 0 : "granting anon extension tables permissions",
1070 0 : )),
1071 0 : },
1072 0 : Operation {
1073 0 : query: format!(
1074 0 : "GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}",
1075 0 : db_owner,
1076 0 : ),
1077 0 : comment: Some(String::from(
1078 0 : "granting anon extension sequences permissions",
1079 0 : )),
1080 0 : },
1081 0 : ]
1082 0 : .into_iter();
1083 0 :
1084 0 : Ok(Box::new(operations))
1085 : }
1086 : }
1087 : }
1088 : // Interestingly, we only install p_s_s in the main database, even when
1089 : // it's preloaded.
1090 : ApplySpecPhase::HandleOtherExtensions => {
1091 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1092 0 : if libs.contains("pg_stat_statements") {
1093 0 : return Ok(Box::new(once(Operation {
1094 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
1095 0 : comment: Some(String::from("create system extensions")),
1096 0 : })));
1097 0 : }
1098 0 : }
1099 0 : Ok(Box::new(empty()))
1100 : }
1101 : ApplySpecPhase::HandleNeonExtension => {
1102 0 : let operations = vec![
1103 0 : Operation {
1104 0 : query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
1105 0 : comment: Some(String::from(
1106 0 : "init: install the extension if not already installed",
1107 0 : )),
1108 0 : },
1109 0 : Operation {
1110 0 : query: String::from(
1111 0 : "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
1112 0 : ),
1113 0 : comment: Some(String::from("compat/fix: make neon relocatable")),
1114 0 : },
1115 0 : Operation {
1116 0 : query: String::from("ALTER EXTENSION neon SET SCHEMA neon"),
1117 0 : comment: Some(String::from("compat/fix: alter neon extension schema")),
1118 0 : },
1119 0 : Operation {
1120 0 : query: String::from("ALTER EXTENSION neon UPDATE"),
1121 0 : comment: Some(String::from("compat/update: update neon extension version")),
1122 0 : },
1123 0 : ]
1124 0 : .into_iter();
1125 0 :
1126 0 : Ok(Box::new(operations))
1127 : }
1128 0 : ApplySpecPhase::CreateAvailabilityCheck => Ok(Box::new(once(Operation {
1129 0 : query: String::from(include_str!("sql/add_availabilitycheck_tables.sql")),
1130 0 : comment: None,
1131 0 : }))),
1132 : ApplySpecPhase::DropRoles => {
1133 0 : let operations = spec
1134 0 : .delta_operations
1135 0 : .iter()
1136 0 : .flatten()
1137 0 : .filter(|op| op.action == "delete_role")
1138 0 : .map(|op| Operation {
1139 0 : query: format!("DROP ROLE IF EXISTS {}", op.name.pg_quote()),
1140 0 : comment: None,
1141 0 : });
1142 0 :
1143 0 : Ok(Box::new(operations))
1144 : }
1145 0 : ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
1146 0 : query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
1147 0 : comment: None,
1148 0 : }))),
1149 : }
1150 0 : }
|