Line data Source code
1 : use std::collections::HashSet;
2 : use std::fs::File;
3 : use std::path::Path;
4 : use std::str::FromStr;
5 :
6 : use anyhow::{anyhow, bail, Context, Result};
7 : use postgres::config::Config;
8 : use postgres::{Client, NoTls};
9 : use reqwest::StatusCode;
10 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
11 :
12 : use crate::config;
13 : use crate::logger::inlinify;
14 : use crate::migration::MigrationRunner;
15 : use crate::params::PG_HBA_ALL_MD5;
16 : use crate::pg_helpers::*;
17 :
18 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
19 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
20 :
21 : // Do control plane request and return response if any. In case of error it
22 : // returns a bool flag indicating whether it makes sense to retry the request
23 : // and a string with error message.
24 0 : fn do_control_plane_request(
25 0 : uri: &str,
26 0 : jwt: &str,
27 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
28 0 : let resp = reqwest::blocking::Client::new()
29 0 : .get(uri)
30 0 : .header("Authorization", format!("Bearer {}", jwt))
31 0 : .send()
32 0 : .map_err(|e| {
33 0 : (
34 0 : true,
35 0 : format!("could not perform spec request to control plane: {}", e),
36 0 : )
37 0 : })?;
38 :
39 0 : match resp.status() {
40 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
41 0 : Ok(spec_resp) => Ok(spec_resp),
42 0 : Err(e) => Err((
43 0 : true,
44 0 : format!("could not deserialize control plane response: {}", e),
45 0 : )),
46 : },
47 : StatusCode::SERVICE_UNAVAILABLE => {
48 0 : Err((true, "control plane is temporarily unavailable".to_string()))
49 : }
50 : StatusCode::BAD_GATEWAY => {
51 : // We have a problem with intermittent 502 errors now
52 : // https://github.com/neondatabase/cloud/issues/2353
53 : // It's fine to retry GET request in this case.
54 0 : Err((true, "control plane request failed with 502".to_string()))
55 : }
56 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
57 : // or some internal failure happened. Doesn't make much sense to retry in this case.
58 0 : _ => Err((
59 0 : false,
60 0 : format!(
61 0 : "unexpected control plane response status code: {}",
62 0 : resp.status()
63 0 : ),
64 0 : )),
65 : }
66 0 : }
67 :
68 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
69 : /// env variable is set, it will be used for authorization.
70 0 : pub fn get_spec_from_control_plane(
71 0 : base_uri: &str,
72 0 : compute_id: &str,
73 0 : ) -> Result<Option<ComputeSpec>> {
74 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
75 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
76 0 : Ok(v) => v,
77 0 : Err(_) => "".to_string(),
78 : };
79 0 : let mut attempt = 1;
80 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
81 0 :
82 0 : info!("getting spec from control plane: {}", cp_uri);
83 :
84 : // Do 3 attempts to get spec from the control plane using the following logic:
85 : // - network error -> then retry
86 : // - compute id is unknown or any other error -> bail out
87 : // - no spec for compute yet (Empty state) -> return Ok(None)
88 : // - got spec -> return Ok(Some(spec))
89 0 : while attempt < 4 {
90 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
91 0 : Ok(spec_resp) => match spec_resp.status {
92 0 : ControlPlaneComputeStatus::Empty => Ok(None),
93 : ControlPlaneComputeStatus::Attached => {
94 0 : if let Some(spec) = spec_resp.spec {
95 0 : Ok(Some(spec))
96 : } else {
97 0 : bail!("compute is attached, but spec is empty")
98 : }
99 : }
100 : },
101 0 : Err((retry, msg)) => {
102 0 : if retry {
103 0 : Err(anyhow!(msg))
104 : } else {
105 0 : bail!(msg);
106 : }
107 : }
108 : };
109 :
110 0 : if let Err(e) = &spec {
111 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
112 : } else {
113 0 : return spec;
114 : }
115 :
116 0 : attempt += 1;
117 0 : std::thread::sleep(std::time::Duration::from_millis(100));
118 : }
119 :
120 : // All attempts failed, return error.
121 0 : spec
122 0 : }
123 :
124 : /// Check `pg_hba.conf` and update if needed to allow external connections.
125 0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
126 0 : // XXX: consider making it a part of spec.json
127 0 : info!("checking pg_hba.conf");
128 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
129 0 :
130 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
131 0 : info!("updated pg_hba.conf to allow external connections");
132 : } else {
133 0 : info!("pg_hba.conf is up-to-date");
134 : }
135 :
136 0 : Ok(())
137 0 : }
138 :
139 : /// Create a standby.signal file
140 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
141 0 : // XXX: consider making it a part of spec.json
142 0 : info!("adding standby.signal");
143 0 : let signalfile = pgdata_path.join("standby.signal");
144 0 :
145 0 : if !signalfile.exists() {
146 0 : info!("created standby.signal");
147 0 : File::create(signalfile)?;
148 : } else {
149 0 : info!("reused pre-existing standby.signal");
150 : }
151 0 : Ok(())
152 0 : }
153 :
154 : /// Compute could be unexpectedly shut down, for example, during the
155 : /// database dropping. This leaves the database in the invalid state,
156 : /// which prevents new db creation with the same name. This function
157 : /// will clean it up before proceeding with catalog updates. All
158 : /// possible future cleanup operations may go here too.
159 0 : #[instrument(skip_all)]
160 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
161 : let existing_dbs = get_existing_dbs(client)?;
162 :
163 : for (_, db) in existing_dbs {
164 : if db.invalid {
165 : // After recent commit in Postgres, interrupted DROP DATABASE
166 : // leaves the database in the invalid state. According to the
167 : // commit message, the only option for user is to drop it again.
168 : // See:
169 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
170 : //
171 : // Postgres Neon extension is done the way, that db is de-registered
172 : // in the control plane metadata only after it is dropped. So there is
173 : // a chance that it still thinks that db should exist. This means
174 : // that it will be re-created by `handle_databases()`. Yet, it's fine
175 : // as user can just repeat drop (in vanilla Postgres they would need
176 : // to do the same, btw).
177 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
178 : info!("dropping invalid database {}", db.name);
179 : client.execute(query.as_str(), &[])?;
180 : }
181 : }
182 :
183 : Ok(())
184 : }
185 :
186 : /// Given a cluster spec json and open transaction it handles roles creation,
187 : /// deletion and update.
188 0 : #[instrument(skip_all)]
189 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
190 : let mut xact = client.transaction()?;
191 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
192 :
193 : let mut jwks_roles = HashSet::new();
194 : if let Some(local_proxy) = &spec.local_proxy_config {
195 : for jwks_setting in local_proxy.jwks.iter().flatten() {
196 : for role_name in &jwks_setting.role_names {
197 : jwks_roles.insert(role_name.clone());
198 : }
199 : }
200 : }
201 :
202 : // Print a list of existing Postgres roles (only in debug mode)
203 : if span_enabled!(Level::INFO) {
204 : let mut vec = Vec::new();
205 : for r in &existing_roles {
206 : vec.push(format!(
207 : "{}:{}",
208 : r.name,
209 : if r.encrypted_password.is_some() {
210 : "[FILTERED]"
211 : } else {
212 : "(null)"
213 : }
214 : ));
215 : }
216 :
217 : info!("postgres roles (total {}): {:?}", vec.len(), vec);
218 : }
219 :
220 : // Process delta operations first
221 : if let Some(ops) = &spec.delta_operations {
222 : info!("processing role renames");
223 : for op in ops {
224 : match op.action.as_ref() {
225 : "delete_role" => {
226 : // no-op now, roles will be deleted at the end of configuration
227 : }
228 : // Renaming role drops its password, since role name is
229 : // used as a salt there. It is important that this role
230 : // is recorded with a new `name` in the `roles` list.
231 : // Follow up roles update will set the new password.
232 : "rename_role" => {
233 : let new_name = op.new_name.as_ref().unwrap();
234 :
235 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
236 0 : if existing_roles.iter().any(|r| r.name == op.name) {
237 : let query: String = format!(
238 : "ALTER ROLE {} RENAME TO {}",
239 : op.name.pg_quote(),
240 : new_name.pg_quote()
241 : );
242 :
243 : warn!("renaming role '{}' to '{}'", op.name, new_name);
244 : xact.execute(query.as_str(), &[])?;
245 : }
246 : }
247 : _ => {}
248 : }
249 : }
250 : }
251 :
252 : // Refresh Postgres roles info to handle possible roles renaming
253 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
254 :
255 : info!(
256 : "handling cluster spec roles (total {})",
257 : spec.cluster.roles.len()
258 : );
259 : for role in &spec.cluster.roles {
260 : let name = &role.name;
261 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
262 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
263 :
264 : enum RoleAction {
265 : None,
266 : Update,
267 : Create,
268 : }
269 : let action = if let Some(r) = pg_role {
270 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
271 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
272 : {
273 : RoleAction::Update
274 : } else if let Some(pg_pwd) = &r.encrypted_password {
275 : // Check whether password changed or not (trim 'md5' prefix first if any)
276 : //
277 : // This is a backward compatibility hack, which comes from the times when we were using
278 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
279 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
280 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
281 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
282 : // to place this compatibility layer.
283 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
284 : stripped
285 : } else {
286 : pg_pwd
287 : };
288 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
289 : RoleAction::Update
290 : } else {
291 : RoleAction::None
292 : }
293 : } else {
294 : RoleAction::None
295 : }
296 : } else {
297 : RoleAction::Create
298 : };
299 :
300 : match action {
301 : RoleAction::None => {}
302 : RoleAction::Update => {
303 : // This can be run on /every/ role! Not just ones created through the console.
304 : // This means that if you add some funny ALTER here that adds a permission,
305 : // this will get run even on user-created roles! This will result in different
306 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
307 : // now only grants LOGIN and changes the password. Please do not allow this branch
308 : // to do anything silly.
309 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
310 : query.push_str(&role.to_pg_options());
311 : xact.execute(query.as_str(), &[])?;
312 : }
313 : RoleAction::Create => {
314 : // This branch only runs when roles are created through the console, so it is
315 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
316 : // from neon_superuser.
317 : let mut query: String = format!(
318 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
319 : name.pg_quote()
320 : );
321 : if jwks_roles.contains(name.as_str()) {
322 : query = format!("CREATE ROLE {}", name.pg_quote());
323 : }
324 : info!("running role create query: '{}'", &query);
325 : query.push_str(&role.to_pg_options());
326 : xact.execute(query.as_str(), &[])?;
327 : }
328 : }
329 :
330 : if span_enabled!(Level::INFO) {
331 : let pwd = if role.encrypted_password.is_some() {
332 : "[FILTERED]"
333 : } else {
334 : "(null)"
335 : };
336 : let action_str = match action {
337 : RoleAction::None => "",
338 : RoleAction::Create => " -> create",
339 : RoleAction::Update => " -> update",
340 : };
341 : info!(" - {}:{}{}", name, pwd, action_str);
342 : }
343 : }
344 :
345 : xact.commit()?;
346 :
347 : Ok(())
348 : }
349 :
350 : /// Reassign all dependent objects and delete requested roles.
351 0 : #[instrument(skip_all)]
352 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
353 : if let Some(ops) = &spec.delta_operations {
354 : // First, reassign all dependent objects to db owners.
355 : info!("reassigning dependent objects of to-be-deleted roles");
356 :
357 : // Fetch existing roles. We could've exported and used `existing_roles` from
358 : // `handle_roles()`, but we only make this list there before creating new roles.
359 : // Which is probably fine as we never create to-be-deleted roles, but that'd
360 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
361 : // buffers already, so this shouldn't be a big deal.
362 : let mut xact = client.transaction()?;
363 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
364 : xact.commit()?;
365 :
366 : for op in ops {
367 : // Check that role is still present in Postgres, as this could be a
368 : // restart with the same spec after role deletion.
369 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
370 : reassign_owned_objects(spec, connstr, &op.name)?;
371 : }
372 : }
373 :
374 : // Second, proceed with role deletions.
375 : info!("processing role deletions");
376 : let mut xact = client.transaction()?;
377 : for op in ops {
378 : // We do not check either role exists or not,
379 : // Postgres will take care of it for us
380 : if op.action == "delete_role" {
381 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
382 :
383 : warn!("deleting role '{}'", &op.name);
384 : xact.execute(query.as_str(), &[])?;
385 : }
386 : }
387 : xact.commit()?;
388 : }
389 :
390 : Ok(())
391 : }
392 :
393 0 : fn reassign_owned_objects_in_one_db(
394 0 : conf: Config,
395 0 : role_name: &PgIdent,
396 0 : db_owner: &PgIdent,
397 0 : ) -> Result<()> {
398 0 : let mut client = conf.connect(NoTls)?;
399 :
400 : // This will reassign all dependent objects to the db owner
401 0 : let reassign_query = format!(
402 0 : "REASSIGN OWNED BY {} TO {}",
403 0 : role_name.pg_quote(),
404 0 : db_owner.pg_quote()
405 0 : );
406 0 : info!(
407 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
408 0 : role_name,
409 0 : conf.get_dbname().unwrap_or(""),
410 : db_owner
411 : );
412 0 : client.simple_query(&reassign_query)?;
413 :
414 : // This now will only drop privileges of the role
415 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
416 0 : client.simple_query(&drop_query)?;
417 0 : Ok(())
418 0 : }
419 :
420 : // Reassign all owned objects in all databases to the owner of the database.
421 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
422 0 : for db in &spec.cluster.databases {
423 0 : if db.owner != *role_name {
424 0 : let mut conf = Config::from_str(connstr)?;
425 0 : conf.dbname(&db.name);
426 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
427 0 : }
428 : }
429 :
430 : // Also handle case when there are no databases in the spec.
431 : // In this case we need to reassign objects in the default database.
432 0 : let conf = Config::from_str(connstr)?;
433 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
434 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
435 :
436 0 : Ok(())
437 0 : }
438 :
439 : /// It follows mostly the same logic as `handle_roles()` excepting that we
440 : /// does not use an explicit transactions block, since major database operations
441 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
442 : /// atomicity should be enough here due to the order of operations and various checks,
443 : /// which together provide us idempotency.
444 0 : #[instrument(skip_all)]
445 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
446 : let existing_dbs = get_existing_dbs(client)?;
447 :
448 : // Print a list of existing Postgres databases (only in debug mode)
449 : if span_enabled!(Level::INFO) {
450 : let mut vec = Vec::new();
451 : for (dbname, db) in &existing_dbs {
452 : vec.push(format!("{}:{}", dbname, db.owner));
453 : }
454 : info!("postgres databases (total {}): {:?}", vec.len(), vec);
455 : }
456 :
457 : // Process delta operations first
458 : if let Some(ops) = &spec.delta_operations {
459 : info!("processing delta operations on databases");
460 : for op in ops {
461 : match op.action.as_ref() {
462 : // We do not check either DB exists or not,
463 : // Postgres will take care of it for us
464 : "delete_db" => {
465 : // In Postgres we can't drop a database if it is a template.
466 : // So we need to unset the template flag first, but it could
467 : // be a retry, so we could've already dropped the database.
468 : // Check that database exists first to make it idempotent.
469 : let unset_template_query: String = format!(
470 : "
471 : DO $$
472 : BEGIN
473 : IF EXISTS(
474 : SELECT 1
475 : FROM pg_catalog.pg_database
476 : WHERE datname = {}
477 : )
478 : THEN
479 : ALTER DATABASE {} is_template false;
480 : END IF;
481 : END
482 : $$;",
483 : escape_literal(&op.name),
484 : &op.name.pg_quote()
485 : );
486 : // Use FORCE to drop database even if there are active connections.
487 : // We run this from `cloud_admin`, so it should have enough privileges.
488 : // NB: there could be other db states, which prevent us from dropping
489 : // the database. For example, if db is used by any active subscription
490 : // or replication slot.
491 : // TODO: deal with it once we allow logical replication. Proper fix should
492 : // involve returning an error code to the control plane, so it could
493 : // figure out that this is a non-retryable error, return it to the user
494 : // and fail operation permanently.
495 : let drop_db_query: String = format!(
496 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
497 : &op.name.pg_quote()
498 : );
499 :
500 : warn!("deleting database '{}'", &op.name);
501 : client.execute(unset_template_query.as_str(), &[])?;
502 : client.execute(drop_db_query.as_str(), &[])?;
503 : }
504 : "rename_db" => {
505 : let new_name = op.new_name.as_ref().unwrap();
506 :
507 : if existing_dbs.contains_key(&op.name) {
508 : let query: String = format!(
509 : "ALTER DATABASE {} RENAME TO {}",
510 : op.name.pg_quote(),
511 : new_name.pg_quote()
512 : );
513 :
514 : warn!("renaming database '{}' to '{}'", op.name, new_name);
515 : client.execute(query.as_str(), &[])?;
516 : }
517 : }
518 : _ => {}
519 : }
520 : }
521 : }
522 :
523 : // Refresh Postgres databases info to handle possible renames
524 : let existing_dbs = get_existing_dbs(client)?;
525 :
526 : info!(
527 : "handling cluster spec databases (total {})",
528 : spec.cluster.databases.len()
529 : );
530 : for db in &spec.cluster.databases {
531 : let name = &db.name;
532 : let pg_db = existing_dbs.get(name);
533 :
534 : enum DatabaseAction {
535 : None,
536 : Update,
537 : Create,
538 : }
539 : let action = if let Some(r) = pg_db {
540 : // XXX: db owner name is returned as quoted string from Postgres,
541 : // when quoting is needed.
542 : let new_owner = if r.owner.starts_with('"') {
543 : db.owner.pg_quote()
544 : } else {
545 : db.owner.clone()
546 : };
547 :
548 : if new_owner != r.owner {
549 : // Update the owner
550 : DatabaseAction::Update
551 : } else {
552 : DatabaseAction::None
553 : }
554 : } else {
555 : DatabaseAction::Create
556 : };
557 :
558 : match action {
559 : DatabaseAction::None => {}
560 : DatabaseAction::Update => {
561 : let query: String = format!(
562 : "ALTER DATABASE {} OWNER TO {}",
563 : name.pg_quote(),
564 : db.owner.pg_quote()
565 : );
566 : let _guard = info_span!("executing", query).entered();
567 : client.execute(query.as_str(), &[])?;
568 : }
569 : DatabaseAction::Create => {
570 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
571 : query.push_str(&db.to_pg_options());
572 : let _guard = info_span!("executing", query).entered();
573 : client.execute(query.as_str(), &[])?;
574 : let grant_query: String = format!(
575 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
576 : name.pg_quote()
577 : );
578 : client.execute(grant_query.as_str(), &[])?;
579 : }
580 : };
581 :
582 : if span_enabled!(Level::INFO) {
583 : let action_str = match action {
584 : DatabaseAction::None => "",
585 : DatabaseAction::Create => " -> create",
586 : DatabaseAction::Update => " -> update",
587 : };
588 : info!(" - {}:{}{}", db.name, db.owner, action_str);
589 : }
590 : }
591 :
592 : Ok(())
593 : }
594 :
595 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
596 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
597 0 : #[instrument(skip_all)]
598 : pub fn handle_grants(
599 : spec: &ComputeSpec,
600 : client: &mut Client,
601 : connstr: &str,
602 : enable_anon_extension: bool,
603 : ) -> Result<()> {
604 : info!("modifying database permissions");
605 : let existing_dbs = get_existing_dbs(client)?;
606 :
607 : // Do some per-database access adjustments. We'd better do this at db creation time,
608 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
609 : // atomically.
610 : for db in &spec.cluster.databases {
611 : match existing_dbs.get(&db.name) {
612 : Some(pg_db) => {
613 : if pg_db.restrict_conn || pg_db.invalid {
614 : info!(
615 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
616 : db.name, pg_db.invalid, pg_db.restrict_conn
617 : );
618 : continue;
619 : }
620 : }
621 : None => {
622 : bail!(
623 : "database {} doesn't exist in Postgres after handle_databases()",
624 : db.name
625 : );
626 : }
627 : }
628 :
629 : let mut conf = Config::from_str(connstr)?;
630 : conf.dbname(&db.name);
631 :
632 : let mut db_client = conf.connect(NoTls)?;
633 :
634 : // This will only change ownership on the schema itself, not the objects
635 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
636 : // and database owner cannot do anything with it. SQL procedure ensures
637 : // that it won't error out if schema `public` doesn't exist.
638 : let alter_query = format!(
639 : "DO $$\n\
640 : DECLARE\n\
641 : schema_owner TEXT;\n\
642 : BEGIN\n\
643 : IF EXISTS(\n\
644 : SELECT nspname\n\
645 : FROM pg_catalog.pg_namespace\n\
646 : WHERE nspname = 'public'\n\
647 : )\n\
648 : THEN\n\
649 : SELECT nspowner::regrole::text\n\
650 : FROM pg_catalog.pg_namespace\n\
651 : WHERE nspname = 'public'\n\
652 : INTO schema_owner;\n\
653 : \n\
654 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
655 : THEN\n\
656 : ALTER SCHEMA public OWNER TO {};\n\
657 : END IF;\n\
658 : END IF;\n\
659 : END\n\
660 : $$;",
661 : db.owner.pg_quote()
662 : );
663 : db_client.simple_query(&alter_query)?;
664 :
665 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
666 : // This is needed because since postgres 15 this privilege is removed by default.
667 : // TODO: web_access isn't created for almost 1 year. It could be that we have
668 : // active users of 1 year old projects, but hopefully not, so check it and
669 : // remove this code if possible. The worst thing that could happen is that
670 : // user won't be able to use public schema in NEW databases created in the
671 : // very OLD project.
672 : //
673 : // Also, alter default permissions so that relations created by extensions can be
674 : // used by neon_superuser without permission issues.
675 : let grant_query = "DO $$\n\
676 : BEGIN\n\
677 : IF EXISTS(\n\
678 : SELECT nspname\n\
679 : FROM pg_catalog.pg_namespace\n\
680 : WHERE nspname = 'public'\n\
681 : ) AND\n\
682 : current_setting('server_version_num')::int/10000 >= 15\n\
683 : THEN\n\
684 : IF EXISTS(\n\
685 : SELECT rolname\n\
686 : FROM pg_catalog.pg_roles\n\
687 : WHERE rolname = 'web_access'\n\
688 : )\n\
689 : THEN\n\
690 : GRANT CREATE ON SCHEMA public TO web_access;\n\
691 : END IF;\n\
692 : END IF;\n\
693 : IF EXISTS(\n\
694 : SELECT nspname\n\
695 : FROM pg_catalog.pg_namespace\n\
696 : WHERE nspname = 'public'\n\
697 : )\n\
698 : THEN\n\
699 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
700 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
701 : END IF;\n\
702 : END\n\
703 : $$;"
704 : .to_string();
705 :
706 : info!(
707 : "grant query for db {} : {}",
708 : &db.name,
709 : inlinify(&grant_query)
710 : );
711 : db_client.simple_query(&grant_query)?;
712 :
713 : // it is important to run this after all grants
714 : if enable_anon_extension {
715 : handle_extension_anon(spec, &db.owner, &mut db_client, false)
716 : .context("handle_grants handle_extension_anon")?;
717 : }
718 : }
719 :
720 : Ok(())
721 : }
722 :
723 : /// Create required system extensions
724 0 : #[instrument(skip_all)]
725 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
726 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
727 : if libs.contains("pg_stat_statements") {
728 : // Create extension only if this compute really needs it
729 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
730 : info!("creating system extensions with query: {}", query);
731 : client.simple_query(query)?;
732 : }
733 : }
734 :
735 : Ok(())
736 : }
737 :
738 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
739 0 : #[instrument(skip_all)]
740 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
741 : info!("handle extension neon");
742 :
743 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
744 : client.simple_query(query)?;
745 :
746 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
747 : info!("create neon extension with query: {}", query);
748 : client.simple_query(query)?;
749 :
750 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
751 : client.simple_query(query)?;
752 :
753 : query = "ALTER EXTENSION neon SET SCHEMA neon";
754 : info!("alter neon extension schema with query: {}", query);
755 : client.simple_query(query)?;
756 :
757 : // this will be a no-op if extension is already up to date,
758 : // which may happen in two cases:
759 : // - extension was just installed
760 : // - extension was already installed and is up to date
761 : let query = "ALTER EXTENSION neon UPDATE";
762 : info!("update neon extension version with query: {}", query);
763 : if let Err(e) = client.simple_query(query) {
764 : error!(
765 : "failed to upgrade neon extension during `handle_extension_neon`: {}",
766 : e
767 : );
768 : }
769 :
770 : Ok(())
771 : }
772 :
773 0 : #[instrument(skip_all)]
774 : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
775 : info!("handle neon extension upgrade");
776 : let query = "ALTER EXTENSION neon UPDATE";
777 : info!("update neon extension version with query: {}", query);
778 : client.simple_query(query)?;
779 :
780 : Ok(())
781 : }
782 :
783 0 : #[instrument(skip_all)]
784 : pub fn handle_migrations(client: &mut Client) -> Result<()> {
785 : info!("handle migrations");
786 :
787 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
788 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
789 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
790 :
791 : // Add new migrations in numerical order.
792 : let migrations = [
793 : include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
794 : include_str!("./migrations/0002-alter_roles.sql"),
795 : include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
796 : include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
797 : include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
798 : include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
799 : include_str!(
800 : "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
801 : ),
802 : include_str!(
803 : "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
804 : ),
805 : include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
806 : include_str!(
807 : "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
808 : ),
809 : include_str!(
810 : "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
811 : ),
812 : ];
813 :
814 : MigrationRunner::new(client, &migrations).run_migrations()?;
815 :
816 : Ok(())
817 : }
818 :
819 : /// Connect to the database as superuser and pre-create anon extension
820 : /// if it is present in shared_preload_libraries
821 0 : #[instrument(skip_all)]
822 : pub fn handle_extension_anon(
823 : spec: &ComputeSpec,
824 : db_owner: &str,
825 : db_client: &mut Client,
826 : grants_only: bool,
827 : ) -> Result<()> {
828 : info!("handle extension anon");
829 :
830 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
831 : if libs.contains("anon") {
832 : if !grants_only {
833 : // check if extension is already initialized using anon.is_initialized()
834 : let query = "SELECT anon.is_initialized()";
835 : match db_client.query(query, &[]) {
836 : Ok(rows) => {
837 : if !rows.is_empty() {
838 : let is_initialized: bool = rows[0].get(0);
839 : if is_initialized {
840 : info!("anon extension is already initialized");
841 : return Ok(());
842 : }
843 : }
844 : }
845 : Err(e) => {
846 : warn!(
847 : "anon extension is_installed check failed with expected error: {}",
848 : e
849 : );
850 : }
851 : };
852 :
853 : // Create anon extension if this compute needs it
854 : // Users cannot create it themselves, because superuser is required.
855 : let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
856 : info!("creating anon extension with query: {}", query);
857 : match db_client.query(query, &[]) {
858 : Ok(_) => {}
859 : Err(e) => {
860 : error!("anon extension creation failed with error: {}", e);
861 : return Ok(());
862 : }
863 : }
864 :
865 : // check that extension is installed
866 : query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
867 : let rows = db_client.query(query, &[])?;
868 : if rows.is_empty() {
869 : error!("anon extension is not installed");
870 : return Ok(());
871 : }
872 :
873 : // Initialize anon extension
874 : // This also requires superuser privileges, so users cannot do it themselves.
875 : query = "SELECT anon.init()";
876 : match db_client.query(query, &[]) {
877 : Ok(_) => {}
878 : Err(e) => {
879 : error!("anon.init() failed with error: {}", e);
880 : return Ok(());
881 : }
882 : }
883 : }
884 :
885 : // check that extension is installed, if not bail early
886 : let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
887 : match db_client.query(query, &[]) {
888 : Ok(rows) => {
889 : if rows.is_empty() {
890 : error!("anon extension is not installed");
891 : return Ok(());
892 : }
893 : }
894 : Err(e) => {
895 : error!("anon extension check failed with error: {}", e);
896 : return Ok(());
897 : }
898 : };
899 :
900 : let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
901 : info!("granting anon extension permissions with query: {}", query);
902 : db_client.simple_query(&query)?;
903 :
904 : // Grant permissions to db_owner to use anon extension functions
905 : let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
906 : info!("granting anon extension permissions with query: {}", query);
907 : db_client.simple_query(&query)?;
908 :
909 : // This is needed, because some functions are defined as SECURITY DEFINER.
910 : // In Postgres SECURITY DEFINER functions are executed with the privileges
911 : // of the owner.
912 : // In anon extension this it is needed to access some GUCs, which are only accessible to
913 : // superuser. But we've patched postgres to allow db_owner to access them as well.
914 : // So we need to change owner of these functions to db_owner.
915 : let query = format!("
916 : SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
917 : from pg_proc p
918 : join pg_namespace nsp ON p.pronamespace = nsp.oid
919 : where nsp.nspname = 'anon';", db_owner);
920 :
921 : info!("change anon extension functions owner to db owner");
922 : db_client.simple_query(&query)?;
923 :
924 : // affects views as well
925 : let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
926 : info!("granting anon extension permissions with query: {}", query);
927 : db_client.simple_query(&query)?;
928 :
929 : let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
930 : info!("granting anon extension permissions with query: {}", query);
931 : db_client.simple_query(&query)?;
932 : }
933 : }
934 :
935 : Ok(())
936 : }
|