Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Context, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::logger::inlinify;
13 : use crate::migration::MigrationRunner;
14 : use crate::params::PG_HBA_ALL_MD5;
15 : use crate::pg_helpers::*;
16 :
17 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
18 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
19 :
20 : // Do control plane request and return response if any. In case of error it
21 : // returns a bool flag indicating whether it makes sense to retry the request
22 : // and a string with error message.
23 0 : fn do_control_plane_request(
24 0 : uri: &str,
25 0 : jwt: &str,
26 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
27 0 : let resp = reqwest::blocking::Client::new()
28 0 : .get(uri)
29 0 : .header("Authorization", format!("Bearer {}", jwt))
30 0 : .send()
31 0 : .map_err(|e| {
32 0 : (
33 0 : true,
34 0 : format!("could not perform spec request to control plane: {}", e),
35 0 : )
36 0 : })?;
37 :
38 0 : match resp.status() {
39 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
40 0 : Ok(spec_resp) => Ok(spec_resp),
41 0 : Err(e) => Err((
42 0 : true,
43 0 : format!("could not deserialize control plane response: {}", e),
44 0 : )),
45 : },
46 : StatusCode::SERVICE_UNAVAILABLE => {
47 0 : Err((true, "control plane is temporarily unavailable".to_string()))
48 : }
49 : StatusCode::BAD_GATEWAY => {
50 : // We have a problem with intermittent 502 errors now
51 : // https://github.com/neondatabase/cloud/issues/2353
52 : // It's fine to retry GET request in this case.
53 0 : Err((true, "control plane request failed with 502".to_string()))
54 : }
55 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
56 : // or some internal failure happened. Doesn't make much sense to retry in this case.
57 0 : _ => Err((
58 0 : false,
59 0 : format!(
60 0 : "unexpected control plane response status code: {}",
61 0 : resp.status()
62 0 : ),
63 0 : )),
64 : }
65 0 : }
66 :
67 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
68 : /// env variable is set, it will be used for authorization.
69 0 : pub fn get_spec_from_control_plane(
70 0 : base_uri: &str,
71 0 : compute_id: &str,
72 0 : ) -> Result<Option<ComputeSpec>> {
73 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
74 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
75 0 : Ok(v) => v,
76 0 : Err(_) => "".to_string(),
77 : };
78 0 : let mut attempt = 1;
79 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
80 0 :
81 0 : info!("getting spec from control plane: {}", cp_uri);
82 :
83 : // Do 3 attempts to get spec from the control plane using the following logic:
84 : // - network error -> then retry
85 : // - compute id is unknown or any other error -> bail out
86 : // - no spec for compute yet (Empty state) -> return Ok(None)
87 : // - got spec -> return Ok(Some(spec))
88 0 : while attempt < 4 {
89 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
90 0 : Ok(spec_resp) => match spec_resp.status {
91 0 : ControlPlaneComputeStatus::Empty => Ok(None),
92 : ControlPlaneComputeStatus::Attached => {
93 0 : if let Some(spec) = spec_resp.spec {
94 0 : Ok(Some(spec))
95 : } else {
96 0 : bail!("compute is attached, but spec is empty")
97 : }
98 : }
99 : },
100 0 : Err((retry, msg)) => {
101 0 : if retry {
102 0 : Err(anyhow!(msg))
103 : } else {
104 0 : bail!(msg);
105 : }
106 : }
107 : };
108 :
109 0 : if let Err(e) = &spec {
110 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
111 : } else {
112 0 : return spec;
113 : }
114 :
115 0 : attempt += 1;
116 0 : std::thread::sleep(std::time::Duration::from_millis(100));
117 : }
118 :
119 : // All attempts failed, return error.
120 0 : spec
121 0 : }
122 :
123 : /// Check `pg_hba.conf` and update if needed to allow external connections.
124 0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
125 0 : // XXX: consider making it a part of spec.json
126 0 : info!("checking pg_hba.conf");
127 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
128 0 :
129 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
130 0 : info!("updated pg_hba.conf to allow external connections");
131 : } else {
132 0 : info!("pg_hba.conf is up-to-date");
133 : }
134 :
135 0 : Ok(())
136 0 : }
137 :
138 : /// Create a standby.signal file
139 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
140 0 : // XXX: consider making it a part of spec.json
141 0 : info!("adding standby.signal");
142 0 : let signalfile = pgdata_path.join("standby.signal");
143 0 :
144 0 : if !signalfile.exists() {
145 0 : info!("created standby.signal");
146 0 : File::create(signalfile)?;
147 : } else {
148 0 : info!("reused pre-existing standby.signal");
149 : }
150 0 : Ok(())
151 0 : }
152 :
153 : /// Compute could be unexpectedly shut down, for example, during the
154 : /// database dropping. This leaves the database in the invalid state,
155 : /// which prevents new db creation with the same name. This function
156 : /// will clean it up before proceeding with catalog updates. All
157 : /// possible future cleanup operations may go here too.
158 0 : #[instrument(skip_all)]
159 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
160 : let existing_dbs = get_existing_dbs(client)?;
161 :
162 : for (_, db) in existing_dbs {
163 : if db.invalid {
164 : // After recent commit in Postgres, interrupted DROP DATABASE
165 : // leaves the database in the invalid state. According to the
166 : // commit message, the only option for user is to drop it again.
167 : // See:
168 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
169 : //
170 : // Postgres Neon extension is done the way, that db is de-registered
171 : // in the control plane metadata only after it is dropped. So there is
172 : // a chance that it still thinks that db should exist. This means
173 : // that it will be re-created by `handle_databases()`. Yet, it's fine
174 : // as user can just repeat drop (in vanilla Postgres they would need
175 : // to do the same, btw).
176 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
177 : info!("dropping invalid database {}", db.name);
178 : client.execute(query.as_str(), &[])?;
179 : }
180 : }
181 :
182 : Ok(())
183 : }
184 :
185 : /// Given a cluster spec json and open transaction it handles roles creation,
186 : /// deletion and update.
187 0 : #[instrument(skip_all)]
188 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
189 : let mut xact = client.transaction()?;
190 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
191 :
192 : // Print a list of existing Postgres roles (only in debug mode)
193 : if span_enabled!(Level::INFO) {
194 : let mut vec = Vec::new();
195 : for r in &existing_roles {
196 : vec.push(format!(
197 : "{}:{}",
198 : r.name,
199 : if r.encrypted_password.is_some() {
200 : "[FILTERED]"
201 : } else {
202 : "(null)"
203 : }
204 : ));
205 : }
206 :
207 : info!("postgres roles (total {}): {:?}", vec.len(), vec);
208 : }
209 :
210 : // Process delta operations first
211 : if let Some(ops) = &spec.delta_operations {
212 : info!("processing role renames");
213 : for op in ops {
214 : match op.action.as_ref() {
215 : "delete_role" => {
216 : // no-op now, roles will be deleted at the end of configuration
217 : }
218 : // Renaming role drops its password, since role name is
219 : // used as a salt there. It is important that this role
220 : // is recorded with a new `name` in the `roles` list.
221 : // Follow up roles update will set the new password.
222 : "rename_role" => {
223 : let new_name = op.new_name.as_ref().unwrap();
224 :
225 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
226 0 : if existing_roles.iter().any(|r| r.name == op.name) {
227 : let query: String = format!(
228 : "ALTER ROLE {} RENAME TO {}",
229 : op.name.pg_quote(),
230 : new_name.pg_quote()
231 : );
232 :
233 : warn!("renaming role '{}' to '{}'", op.name, new_name);
234 : xact.execute(query.as_str(), &[])?;
235 : }
236 : }
237 : _ => {}
238 : }
239 : }
240 : }
241 :
242 : // Refresh Postgres roles info to handle possible roles renaming
243 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
244 :
245 : info!(
246 : "handling cluster spec roles (total {})",
247 : spec.cluster.roles.len()
248 : );
249 : for role in &spec.cluster.roles {
250 : let name = &role.name;
251 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
252 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
253 :
254 : enum RoleAction {
255 : None,
256 : Update,
257 : Create,
258 : }
259 : let action = if let Some(r) = pg_role {
260 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
261 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
262 : {
263 : RoleAction::Update
264 : } else if let Some(pg_pwd) = &r.encrypted_password {
265 : // Check whether password changed or not (trim 'md5' prefix first if any)
266 : //
267 : // This is a backward compatibility hack, which comes from the times when we were using
268 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
269 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
270 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
271 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
272 : // to place this compatibility layer.
273 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
274 : stripped
275 : } else {
276 : pg_pwd
277 : };
278 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
279 : RoleAction::Update
280 : } else {
281 : RoleAction::None
282 : }
283 : } else {
284 : RoleAction::None
285 : }
286 : } else {
287 : RoleAction::Create
288 : };
289 :
290 : match action {
291 : RoleAction::None => {}
292 : RoleAction::Update => {
293 : // This can be run on /every/ role! Not just ones created through the console.
294 : // This means that if you add some funny ALTER here that adds a permission,
295 : // this will get run even on user-created roles! This will result in different
296 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
297 : // now only grants LOGIN and changes the password. Please do not allow this branch
298 : // to do anything silly.
299 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
300 : query.push_str(&role.to_pg_options());
301 : xact.execute(query.as_str(), &[])?;
302 : }
303 : RoleAction::Create => {
304 : // This branch only runs when roles are created through the console, so it is
305 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
306 : // from neon_superuser.
307 : let mut query: String = format!(
308 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
309 : name.pg_quote()
310 : );
311 : info!("running role create query: '{}'", &query);
312 : query.push_str(&role.to_pg_options());
313 : xact.execute(query.as_str(), &[])?;
314 : }
315 : }
316 :
317 : if span_enabled!(Level::INFO) {
318 : let pwd = if role.encrypted_password.is_some() {
319 : "[FILTERED]"
320 : } else {
321 : "(null)"
322 : };
323 : let action_str = match action {
324 : RoleAction::None => "",
325 : RoleAction::Create => " -> create",
326 : RoleAction::Update => " -> update",
327 : };
328 : info!(" - {}:{}{}", name, pwd, action_str);
329 : }
330 : }
331 :
332 : xact.commit()?;
333 :
334 : Ok(())
335 : }
336 :
337 : /// Reassign all dependent objects and delete requested roles.
338 0 : #[instrument(skip_all)]
339 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
340 : if let Some(ops) = &spec.delta_operations {
341 : // First, reassign all dependent objects to db owners.
342 : info!("reassigning dependent objects of to-be-deleted roles");
343 :
344 : // Fetch existing roles. We could've exported and used `existing_roles` from
345 : // `handle_roles()`, but we only make this list there before creating new roles.
346 : // Which is probably fine as we never create to-be-deleted roles, but that'd
347 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
348 : // buffers already, so this shouldn't be a big deal.
349 : let mut xact = client.transaction()?;
350 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
351 : xact.commit()?;
352 :
353 : for op in ops {
354 : // Check that role is still present in Postgres, as this could be a
355 : // restart with the same spec after role deletion.
356 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
357 : reassign_owned_objects(spec, connstr, &op.name)?;
358 : }
359 : }
360 :
361 : // Second, proceed with role deletions.
362 : info!("processing role deletions");
363 : let mut xact = client.transaction()?;
364 : for op in ops {
365 : // We do not check either role exists or not,
366 : // Postgres will take care of it for us
367 : if op.action == "delete_role" {
368 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
369 :
370 : warn!("deleting role '{}'", &op.name);
371 : xact.execute(query.as_str(), &[])?;
372 : }
373 : }
374 : xact.commit()?;
375 : }
376 :
377 : Ok(())
378 : }
379 :
380 0 : fn reassign_owned_objects_in_one_db(
381 0 : conf: Config,
382 0 : role_name: &PgIdent,
383 0 : db_owner: &PgIdent,
384 0 : ) -> Result<()> {
385 0 : let mut client = conf.connect(NoTls)?;
386 :
387 : // This will reassign all dependent objects to the db owner
388 0 : let reassign_query = format!(
389 0 : "REASSIGN OWNED BY {} TO {}",
390 0 : role_name.pg_quote(),
391 0 : db_owner.pg_quote()
392 0 : );
393 0 : info!(
394 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
395 0 : role_name,
396 0 : conf.get_dbname().unwrap_or(""),
397 : db_owner
398 : );
399 0 : client.simple_query(&reassign_query)?;
400 :
401 : // This now will only drop privileges of the role
402 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
403 0 : client.simple_query(&drop_query)?;
404 0 : Ok(())
405 0 : }
406 :
407 : // Reassign all owned objects in all databases to the owner of the database.
408 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
409 0 : for db in &spec.cluster.databases {
410 0 : if db.owner != *role_name {
411 0 : let mut conf = Config::from_str(connstr)?;
412 0 : conf.dbname(&db.name);
413 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
414 0 : }
415 : }
416 :
417 : // Also handle case when there are no databases in the spec.
418 : // In this case we need to reassign objects in the default database.
419 0 : let conf = Config::from_str(connstr)?;
420 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
421 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
422 :
423 0 : Ok(())
424 0 : }
425 :
426 : /// It follows mostly the same logic as `handle_roles()` excepting that we
427 : /// does not use an explicit transactions block, since major database operations
428 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
429 : /// atomicity should be enough here due to the order of operations and various checks,
430 : /// which together provide us idempotency.
431 0 : #[instrument(skip_all)]
432 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
433 : let existing_dbs = get_existing_dbs(client)?;
434 :
435 : // Print a list of existing Postgres databases (only in debug mode)
436 : if span_enabled!(Level::INFO) {
437 : let mut vec = Vec::new();
438 : for (dbname, db) in &existing_dbs {
439 : vec.push(format!("{}:{}", dbname, db.owner));
440 : }
441 : info!("postgres databases (total {}): {:?}", vec.len(), vec);
442 : }
443 :
444 : // Process delta operations first
445 : if let Some(ops) = &spec.delta_operations {
446 : info!("processing delta operations on databases");
447 : for op in ops {
448 : match op.action.as_ref() {
449 : // We do not check either DB exists or not,
450 : // Postgres will take care of it for us
451 : "delete_db" => {
452 : // In Postgres we can't drop a database if it is a template.
453 : // So we need to unset the template flag first, but it could
454 : // be a retry, so we could've already dropped the database.
455 : // Check that database exists first to make it idempotent.
456 : let unset_template_query: String = format!(
457 : "
458 : DO $$
459 : BEGIN
460 : IF EXISTS(
461 : SELECT 1
462 : FROM pg_catalog.pg_database
463 : WHERE datname = {}
464 : )
465 : THEN
466 : ALTER DATABASE {} is_template false;
467 : END IF;
468 : END
469 : $$;",
470 : escape_literal(&op.name),
471 : &op.name.pg_quote()
472 : );
473 : // Use FORCE to drop database even if there are active connections.
474 : // We run this from `cloud_admin`, so it should have enough privileges.
475 : // NB: there could be other db states, which prevent us from dropping
476 : // the database. For example, if db is used by any active subscription
477 : // or replication slot.
478 : // TODO: deal with it once we allow logical replication. Proper fix should
479 : // involve returning an error code to the control plane, so it could
480 : // figure out that this is a non-retryable error, return it to the user
481 : // and fail operation permanently.
482 : let drop_db_query: String = format!(
483 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
484 : &op.name.pg_quote()
485 : );
486 :
487 : warn!("deleting database '{}'", &op.name);
488 : client.execute(unset_template_query.as_str(), &[])?;
489 : client.execute(drop_db_query.as_str(), &[])?;
490 : }
491 : "rename_db" => {
492 : let new_name = op.new_name.as_ref().unwrap();
493 :
494 : if existing_dbs.contains_key(&op.name) {
495 : let query: String = format!(
496 : "ALTER DATABASE {} RENAME TO {}",
497 : op.name.pg_quote(),
498 : new_name.pg_quote()
499 : );
500 :
501 : warn!("renaming database '{}' to '{}'", op.name, new_name);
502 : client.execute(query.as_str(), &[])?;
503 : }
504 : }
505 : _ => {}
506 : }
507 : }
508 : }
509 :
510 : // Refresh Postgres databases info to handle possible renames
511 : let existing_dbs = get_existing_dbs(client)?;
512 :
513 : info!(
514 : "handling cluster spec databases (total {})",
515 : spec.cluster.databases.len()
516 : );
517 : for db in &spec.cluster.databases {
518 : let name = &db.name;
519 : let pg_db = existing_dbs.get(name);
520 :
521 : enum DatabaseAction {
522 : None,
523 : Update,
524 : Create,
525 : }
526 : let action = if let Some(r) = pg_db {
527 : // XXX: db owner name is returned as quoted string from Postgres,
528 : // when quoting is needed.
529 : let new_owner = if r.owner.starts_with('"') {
530 : db.owner.pg_quote()
531 : } else {
532 : db.owner.clone()
533 : };
534 :
535 : if new_owner != r.owner {
536 : // Update the owner
537 : DatabaseAction::Update
538 : } else {
539 : DatabaseAction::None
540 : }
541 : } else {
542 : DatabaseAction::Create
543 : };
544 :
545 : match action {
546 : DatabaseAction::None => {}
547 : DatabaseAction::Update => {
548 : let query: String = format!(
549 : "ALTER DATABASE {} OWNER TO {}",
550 : name.pg_quote(),
551 : db.owner.pg_quote()
552 : );
553 : let _guard = info_span!("executing", query).entered();
554 : client.execute(query.as_str(), &[])?;
555 : }
556 : DatabaseAction::Create => {
557 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
558 : query.push_str(&db.to_pg_options());
559 : let _guard = info_span!("executing", query).entered();
560 : client.execute(query.as_str(), &[])?;
561 : let grant_query: String = format!(
562 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
563 : name.pg_quote()
564 : );
565 : client.execute(grant_query.as_str(), &[])?;
566 : }
567 : };
568 :
569 : if span_enabled!(Level::INFO) {
570 : let action_str = match action {
571 : DatabaseAction::None => "",
572 : DatabaseAction::Create => " -> create",
573 : DatabaseAction::Update => " -> update",
574 : };
575 : info!(" - {}:{}{}", db.name, db.owner, action_str);
576 : }
577 : }
578 :
579 : Ok(())
580 : }
581 :
582 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
583 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
584 0 : #[instrument(skip_all)]
585 : pub fn handle_grants(
586 : spec: &ComputeSpec,
587 : client: &mut Client,
588 : connstr: &str,
589 : enable_anon_extension: bool,
590 : ) -> Result<()> {
591 : info!("modifying database permissions");
592 : let existing_dbs = get_existing_dbs(client)?;
593 :
594 : // Do some per-database access adjustments. We'd better do this at db creation time,
595 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
596 : // atomically.
597 : for db in &spec.cluster.databases {
598 : match existing_dbs.get(&db.name) {
599 : Some(pg_db) => {
600 : if pg_db.restrict_conn || pg_db.invalid {
601 : info!(
602 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
603 : db.name, pg_db.invalid, pg_db.restrict_conn
604 : );
605 : continue;
606 : }
607 : }
608 : None => {
609 : bail!(
610 : "database {} doesn't exist in Postgres after handle_databases()",
611 : db.name
612 : );
613 : }
614 : }
615 :
616 : let mut conf = Config::from_str(connstr)?;
617 : conf.dbname(&db.name);
618 :
619 : let mut db_client = conf.connect(NoTls)?;
620 :
621 : // This will only change ownership on the schema itself, not the objects
622 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
623 : // and database owner cannot do anything with it. SQL procedure ensures
624 : // that it won't error out if schema `public` doesn't exist.
625 : let alter_query = format!(
626 : "DO $$\n\
627 : DECLARE\n\
628 : schema_owner TEXT;\n\
629 : BEGIN\n\
630 : IF EXISTS(\n\
631 : SELECT nspname\n\
632 : FROM pg_catalog.pg_namespace\n\
633 : WHERE nspname = 'public'\n\
634 : )\n\
635 : THEN\n\
636 : SELECT nspowner::regrole::text\n\
637 : FROM pg_catalog.pg_namespace\n\
638 : WHERE nspname = 'public'\n\
639 : INTO schema_owner;\n\
640 : \n\
641 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
642 : THEN\n\
643 : ALTER SCHEMA public OWNER TO {};\n\
644 : END IF;\n\
645 : END IF;\n\
646 : END\n\
647 : $$;",
648 : db.owner.pg_quote()
649 : );
650 : db_client.simple_query(&alter_query)?;
651 :
652 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
653 : // This is needed because since postgres 15 this privilege is removed by default.
654 : // TODO: web_access isn't created for almost 1 year. It could be that we have
655 : // active users of 1 year old projects, but hopefully not, so check it and
656 : // remove this code if possible. The worst thing that could happen is that
657 : // user won't be able to use public schema in NEW databases created in the
658 : // very OLD project.
659 : //
660 : // Also, alter default permissions so that relations created by extensions can be
661 : // used by neon_superuser without permission issues.
662 : let grant_query = "DO $$\n\
663 : BEGIN\n\
664 : IF EXISTS(\n\
665 : SELECT nspname\n\
666 : FROM pg_catalog.pg_namespace\n\
667 : WHERE nspname = 'public'\n\
668 : ) AND\n\
669 : current_setting('server_version_num')::int/10000 >= 15\n\
670 : THEN\n\
671 : IF EXISTS(\n\
672 : SELECT rolname\n\
673 : FROM pg_catalog.pg_roles\n\
674 : WHERE rolname = 'web_access'\n\
675 : )\n\
676 : THEN\n\
677 : GRANT CREATE ON SCHEMA public TO web_access;\n\
678 : END IF;\n\
679 : END IF;\n\
680 : IF EXISTS(\n\
681 : SELECT nspname\n\
682 : FROM pg_catalog.pg_namespace\n\
683 : WHERE nspname = 'public'\n\
684 : )\n\
685 : THEN\n\
686 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
687 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
688 : END IF;\n\
689 : END\n\
690 : $$;"
691 : .to_string();
692 :
693 : info!(
694 : "grant query for db {} : {}",
695 : &db.name,
696 : inlinify(&grant_query)
697 : );
698 : db_client.simple_query(&grant_query)?;
699 :
700 : // it is important to run this after all grants
701 : if enable_anon_extension {
702 : handle_extension_anon(spec, &db.owner, &mut db_client, false)
703 : .context("handle_grants handle_extension_anon")?;
704 : }
705 : }
706 :
707 : Ok(())
708 : }
709 :
710 : /// Create required system extensions
711 0 : #[instrument(skip_all)]
712 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
713 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
714 : if libs.contains("pg_stat_statements") {
715 : // Create extension only if this compute really needs it
716 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
717 : info!("creating system extensions with query: {}", query);
718 : client.simple_query(query)?;
719 : }
720 : }
721 :
722 : Ok(())
723 : }
724 :
725 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
726 0 : #[instrument(skip_all)]
727 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
728 : info!("handle extension neon");
729 :
730 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
731 : client.simple_query(query)?;
732 :
733 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
734 : info!("create neon extension with query: {}", query);
735 : client.simple_query(query)?;
736 :
737 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
738 : client.simple_query(query)?;
739 :
740 : query = "ALTER EXTENSION neon SET SCHEMA neon";
741 : info!("alter neon extension schema with query: {}", query);
742 : client.simple_query(query)?;
743 :
744 : // this will be a no-op if extension is already up to date,
745 : // which may happen in two cases:
746 : // - extension was just installed
747 : // - extension was already installed and is up to date
748 : let query = "ALTER EXTENSION neon UPDATE";
749 : info!("update neon extension version with query: {}", query);
750 : if let Err(e) = client.simple_query(query) {
751 : error!(
752 : "failed to upgrade neon extension during `handle_extension_neon`: {}",
753 : e
754 : );
755 : }
756 :
757 : Ok(())
758 : }
759 :
760 0 : #[instrument(skip_all)]
761 : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
762 : info!("handle neon extension upgrade");
763 : let query = "ALTER EXTENSION neon UPDATE";
764 : info!("update neon extension version with query: {}", query);
765 : client.simple_query(query)?;
766 :
767 : Ok(())
768 : }
769 :
770 0 : #[instrument(skip_all)]
771 : pub fn handle_migrations(client: &mut Client) -> Result<()> {
772 : info!("handle migrations");
773 :
774 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
775 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
776 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
777 :
778 : // Add new migrations in numerical order.
779 : let migrations = [
780 : include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
781 : include_str!("./migrations/0002-alter_roles.sql"),
782 : include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
783 : include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
784 : include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
785 : include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
786 : include_str!(
787 : "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
788 : ),
789 : include_str!(
790 : "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
791 : ),
792 : include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
793 : include_str!(
794 : "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
795 : ),
796 : include_str!(
797 : "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
798 : ),
799 : ];
800 :
801 : MigrationRunner::new(client, &migrations).run_migrations()?;
802 :
803 : Ok(())
804 : }
805 :
806 : /// Connect to the database as superuser and pre-create anon extension
807 : /// if it is present in shared_preload_libraries
808 0 : #[instrument(skip_all)]
809 : pub fn handle_extension_anon(
810 : spec: &ComputeSpec,
811 : db_owner: &str,
812 : db_client: &mut Client,
813 : grants_only: bool,
814 : ) -> Result<()> {
815 : info!("handle extension anon");
816 :
817 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
818 : if libs.contains("anon") {
819 : if !grants_only {
820 : // check if extension is already initialized using anon.is_initialized()
821 : let query = "SELECT anon.is_initialized()";
822 : match db_client.query(query, &[]) {
823 : Ok(rows) => {
824 : if !rows.is_empty() {
825 : let is_initialized: bool = rows[0].get(0);
826 : if is_initialized {
827 : info!("anon extension is already initialized");
828 : return Ok(());
829 : }
830 : }
831 : }
832 : Err(e) => {
833 : warn!(
834 : "anon extension is_installed check failed with expected error: {}",
835 : e
836 : );
837 : }
838 : };
839 :
840 : // Create anon extension if this compute needs it
841 : // Users cannot create it themselves, because superuser is required.
842 : let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
843 : info!("creating anon extension with query: {}", query);
844 : match db_client.query(query, &[]) {
845 : Ok(_) => {}
846 : Err(e) => {
847 : error!("anon extension creation failed with error: {}", e);
848 : return Ok(());
849 : }
850 : }
851 :
852 : // check that extension is installed
853 : query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
854 : let rows = db_client.query(query, &[])?;
855 : if rows.is_empty() {
856 : error!("anon extension is not installed");
857 : return Ok(());
858 : }
859 :
860 : // Initialize anon extension
861 : // This also requires superuser privileges, so users cannot do it themselves.
862 : query = "SELECT anon.init()";
863 : match db_client.query(query, &[]) {
864 : Ok(_) => {}
865 : Err(e) => {
866 : error!("anon.init() failed with error: {}", e);
867 : return Ok(());
868 : }
869 : }
870 : }
871 :
872 : // check that extension is installed, if not bail early
873 : let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
874 : match db_client.query(query, &[]) {
875 : Ok(rows) => {
876 : if rows.is_empty() {
877 : error!("anon extension is not installed");
878 : return Ok(());
879 : }
880 : }
881 : Err(e) => {
882 : error!("anon extension check failed with error: {}", e);
883 : return Ok(());
884 : }
885 : };
886 :
887 : let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
888 : info!("granting anon extension permissions with query: {}", query);
889 : db_client.simple_query(&query)?;
890 :
891 : // Grant permissions to db_owner to use anon extension functions
892 : let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
893 : info!("granting anon extension permissions with query: {}", query);
894 : db_client.simple_query(&query)?;
895 :
896 : // This is needed, because some functions are defined as SECURITY DEFINER.
897 : // In Postgres SECURITY DEFINER functions are executed with the privileges
898 : // of the owner.
899 : // In anon extension this it is needed to access some GUCs, which are only accessible to
900 : // superuser. But we've patched postgres to allow db_owner to access them as well.
901 : // So we need to change owner of these functions to db_owner.
902 : let query = format!("
903 : SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
904 : from pg_proc p
905 : join pg_namespace nsp ON p.pronamespace = nsp.oid
906 : where nsp.nspname = 'anon';", db_owner);
907 :
908 : info!("change anon extension functions owner to db owner");
909 : db_client.simple_query(&query)?;
910 :
911 : // affects views as well
912 : let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
913 : info!("granting anon extension permissions with query: {}", query);
914 : db_client.simple_query(&query)?;
915 :
916 : let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
917 : info!("granting anon extension permissions with query: {}", query);
918 : db_client.simple_query(&query)?;
919 : }
920 : }
921 :
922 : Ok(())
923 : }
|