Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Context, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::logger::inlinify;
13 : use crate::params::PG_HBA_ALL_MD5;
14 : use crate::pg_helpers::*;
15 :
16 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
17 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
18 :
19 : // Do control plane request and return response if any. In case of error it
20 : // returns a bool flag indicating whether it makes sense to retry the request
21 : // and a string with error message.
22 0 : fn do_control_plane_request(
23 0 : uri: &str,
24 0 : jwt: &str,
25 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
26 0 : let resp = reqwest::blocking::Client::new()
27 0 : .get(uri)
28 0 : .header("Authorization", format!("Bearer {}", jwt))
29 0 : .send()
30 0 : .map_err(|e| {
31 0 : (
32 0 : true,
33 0 : format!("could not perform spec request to control plane: {}", e),
34 0 : )
35 0 : })?;
36 :
37 0 : match resp.status() {
38 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
39 0 : Ok(spec_resp) => Ok(spec_resp),
40 0 : Err(e) => Err((
41 0 : true,
42 0 : format!("could not deserialize control plane response: {}", e),
43 0 : )),
44 : },
45 : StatusCode::SERVICE_UNAVAILABLE => {
46 0 : Err((true, "control plane is temporarily unavailable".to_string()))
47 : }
48 : StatusCode::BAD_GATEWAY => {
49 : // We have a problem with intermittent 502 errors now
50 : // https://github.com/neondatabase/cloud/issues/2353
51 : // It's fine to retry GET request in this case.
52 0 : Err((true, "control plane request failed with 502".to_string()))
53 : }
54 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
55 : // or some internal failure happened. Doesn't make much sense to retry in this case.
56 0 : _ => Err((
57 0 : false,
58 0 : format!(
59 0 : "unexpected control plane response status code: {}",
60 0 : resp.status()
61 0 : ),
62 0 : )),
63 : }
64 0 : }
65 :
66 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
67 : /// env variable is set, it will be used for authorization.
68 0 : pub fn get_spec_from_control_plane(
69 0 : base_uri: &str,
70 0 : compute_id: &str,
71 0 : ) -> Result<Option<ComputeSpec>> {
72 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
73 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
74 0 : Ok(v) => v,
75 0 : Err(_) => "".to_string(),
76 : };
77 0 : let mut attempt = 1;
78 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
79 0 :
80 0 : info!("getting spec from control plane: {}", cp_uri);
81 :
82 : // Do 3 attempts to get spec from the control plane using the following logic:
83 : // - network error -> then retry
84 : // - compute id is unknown or any other error -> bail out
85 : // - no spec for compute yet (Empty state) -> return Ok(None)
86 : // - got spec -> return Ok(Some(spec))
87 0 : while attempt < 4 {
88 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
89 0 : Ok(spec_resp) => match spec_resp.status {
90 0 : ControlPlaneComputeStatus::Empty => Ok(None),
91 : ControlPlaneComputeStatus::Attached => {
92 0 : if let Some(spec) = spec_resp.spec {
93 0 : Ok(Some(spec))
94 : } else {
95 0 : bail!("compute is attached, but spec is empty")
96 : }
97 : }
98 : },
99 0 : Err((retry, msg)) => {
100 0 : if retry {
101 0 : Err(anyhow!(msg))
102 : } else {
103 0 : bail!(msg);
104 : }
105 : }
106 : };
107 :
108 0 : if let Err(e) = &spec {
109 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
110 : } else {
111 0 : return spec;
112 : }
113 :
114 0 : attempt += 1;
115 0 : std::thread::sleep(std::time::Duration::from_millis(100));
116 : }
117 :
118 : // All attempts failed, return error.
119 0 : spec
120 0 : }
121 :
122 : /// Check `pg_hba.conf` and update if needed to allow external connections.
123 0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
124 0 : // XXX: consider making it a part of spec.json
125 0 : info!("checking pg_hba.conf");
126 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
127 0 :
128 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
129 0 : info!("updated pg_hba.conf to allow external connections");
130 : } else {
131 0 : info!("pg_hba.conf is up-to-date");
132 : }
133 :
134 0 : Ok(())
135 0 : }
136 :
137 : /// Create a standby.signal file
138 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
139 0 : // XXX: consider making it a part of spec.json
140 0 : info!("adding standby.signal");
141 0 : let signalfile = pgdata_path.join("standby.signal");
142 0 :
143 0 : if !signalfile.exists() {
144 0 : info!("created standby.signal");
145 0 : File::create(signalfile)?;
146 : } else {
147 0 : info!("reused pre-existing standby.signal");
148 : }
149 0 : Ok(())
150 0 : }
151 :
152 : /// Compute could be unexpectedly shut down, for example, during the
153 : /// database dropping. This leaves the database in the invalid state,
154 : /// which prevents new db creation with the same name. This function
155 : /// will clean it up before proceeding with catalog updates. All
156 : /// possible future cleanup operations may go here too.
157 0 : #[instrument(skip_all)]
158 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
159 : let existing_dbs = get_existing_dbs(client)?;
160 :
161 : for (_, db) in existing_dbs {
162 : if db.invalid {
163 : // After recent commit in Postgres, interrupted DROP DATABASE
164 : // leaves the database in the invalid state. According to the
165 : // commit message, the only option for user is to drop it again.
166 : // See:
167 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
168 : //
169 : // Postgres Neon extension is done the way, that db is de-registered
170 : // in the control plane metadata only after it is dropped. So there is
171 : // a chance that it still thinks that db should exist. This means
172 : // that it will be re-created by `handle_databases()`. Yet, it's fine
173 : // as user can just repeat drop (in vanilla Postgres they would need
174 : // to do the same, btw).
175 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
176 : info!("dropping invalid database {}", db.name);
177 : client.execute(query.as_str(), &[])?;
178 : }
179 : }
180 :
181 : Ok(())
182 : }
183 :
184 : /// Given a cluster spec json and open transaction it handles roles creation,
185 : /// deletion and update.
186 0 : #[instrument(skip_all)]
187 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
188 : let mut xact = client.transaction()?;
189 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
190 :
191 : // Print a list of existing Postgres roles (only in debug mode)
192 : if span_enabled!(Level::INFO) {
193 : let mut vec = Vec::new();
194 : for r in &existing_roles {
195 : vec.push(format!(
196 : "{}:{}",
197 : r.name,
198 : if r.encrypted_password.is_some() {
199 : "[FILTERED]"
200 : } else {
201 : "(null)"
202 : }
203 : ));
204 : }
205 :
206 : info!("postgres roles (total {}): {:?}", vec.len(), vec);
207 : }
208 :
209 : // Process delta operations first
210 : if let Some(ops) = &spec.delta_operations {
211 : info!("processing role renames");
212 : for op in ops {
213 : match op.action.as_ref() {
214 : "delete_role" => {
215 : // no-op now, roles will be deleted at the end of configuration
216 : }
217 : // Renaming role drops its password, since role name is
218 : // used as a salt there. It is important that this role
219 : // is recorded with a new `name` in the `roles` list.
220 : // Follow up roles update will set the new password.
221 : "rename_role" => {
222 : let new_name = op.new_name.as_ref().unwrap();
223 :
224 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
225 0 : if existing_roles.iter().any(|r| r.name == op.name) {
226 : let query: String = format!(
227 : "ALTER ROLE {} RENAME TO {}",
228 : op.name.pg_quote(),
229 : new_name.pg_quote()
230 : );
231 :
232 : warn!("renaming role '{}' to '{}'", op.name, new_name);
233 : xact.execute(query.as_str(), &[])?;
234 : }
235 : }
236 : _ => {}
237 : }
238 : }
239 : }
240 :
241 : // Refresh Postgres roles info to handle possible roles renaming
242 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
243 :
244 : info!(
245 : "handling cluster spec roles (total {})",
246 : spec.cluster.roles.len()
247 : );
248 : for role in &spec.cluster.roles {
249 : let name = &role.name;
250 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
251 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
252 :
253 : enum RoleAction {
254 : None,
255 : Update,
256 : Create,
257 : }
258 : let action = if let Some(r) = pg_role {
259 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
260 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
261 : {
262 : RoleAction::Update
263 : } else if let Some(pg_pwd) = &r.encrypted_password {
264 : // Check whether password changed or not (trim 'md5' prefix first if any)
265 : //
266 : // This is a backward compatibility hack, which comes from the times when we were using
267 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
268 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
269 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
270 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
271 : // to place this compatibility layer.
272 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
273 : stripped
274 : } else {
275 : pg_pwd
276 : };
277 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
278 : RoleAction::Update
279 : } else {
280 : RoleAction::None
281 : }
282 : } else {
283 : RoleAction::None
284 : }
285 : } else {
286 : RoleAction::Create
287 : };
288 :
289 : match action {
290 : RoleAction::None => {}
291 : RoleAction::Update => {
292 : // This can be run on /every/ role! Not just ones created through the console.
293 : // This means that if you add some funny ALTER here that adds a permission,
294 : // this will get run even on user-created roles! This will result in different
295 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
296 : // now only grants LOGIN and changes the password. Please do not allow this branch
297 : // to do anything silly.
298 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
299 : query.push_str(&role.to_pg_options());
300 : xact.execute(query.as_str(), &[])?;
301 : }
302 : RoleAction::Create => {
303 : // This branch only runs when roles are created through the console, so it is
304 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
305 : // from neon_superuser.
306 : let mut query: String = format!(
307 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
308 : name.pg_quote()
309 : );
310 : info!("running role create query: '{}'", &query);
311 : query.push_str(&role.to_pg_options());
312 : xact.execute(query.as_str(), &[])?;
313 : }
314 : }
315 :
316 : if span_enabled!(Level::INFO) {
317 : let pwd = if role.encrypted_password.is_some() {
318 : "[FILTERED]"
319 : } else {
320 : "(null)"
321 : };
322 : let action_str = match action {
323 : RoleAction::None => "",
324 : RoleAction::Create => " -> create",
325 : RoleAction::Update => " -> update",
326 : };
327 : info!(" - {}:{}{}", name, pwd, action_str);
328 : }
329 : }
330 :
331 : xact.commit()?;
332 :
333 : Ok(())
334 : }
335 :
336 : /// Reassign all dependent objects and delete requested roles.
337 0 : #[instrument(skip_all)]
338 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
339 : if let Some(ops) = &spec.delta_operations {
340 : // First, reassign all dependent objects to db owners.
341 : info!("reassigning dependent objects of to-be-deleted roles");
342 :
343 : // Fetch existing roles. We could've exported and used `existing_roles` from
344 : // `handle_roles()`, but we only make this list there before creating new roles.
345 : // Which is probably fine as we never create to-be-deleted roles, but that'd
346 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
347 : // buffers already, so this shouldn't be a big deal.
348 : let mut xact = client.transaction()?;
349 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
350 : xact.commit()?;
351 :
352 : for op in ops {
353 : // Check that role is still present in Postgres, as this could be a
354 : // restart with the same spec after role deletion.
355 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
356 : reassign_owned_objects(spec, connstr, &op.name)?;
357 : }
358 : }
359 :
360 : // Second, proceed with role deletions.
361 : info!("processing role deletions");
362 : let mut xact = client.transaction()?;
363 : for op in ops {
364 : // We do not check either role exists or not,
365 : // Postgres will take care of it for us
366 : if op.action == "delete_role" {
367 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
368 :
369 : warn!("deleting role '{}'", &op.name);
370 : xact.execute(query.as_str(), &[])?;
371 : }
372 : }
373 : xact.commit()?;
374 : }
375 :
376 : Ok(())
377 : }
378 :
379 0 : fn reassign_owned_objects_in_one_db(
380 0 : conf: Config,
381 0 : role_name: &PgIdent,
382 0 : db_owner: &PgIdent,
383 0 : ) -> Result<()> {
384 0 : let mut client = conf.connect(NoTls)?;
385 :
386 : // This will reassign all dependent objects to the db owner
387 0 : let reassign_query = format!(
388 0 : "REASSIGN OWNED BY {} TO {}",
389 0 : role_name.pg_quote(),
390 0 : db_owner.pg_quote()
391 0 : );
392 0 : info!(
393 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
394 0 : role_name,
395 0 : conf.get_dbname().unwrap_or(""),
396 : db_owner
397 : );
398 0 : client.simple_query(&reassign_query)?;
399 :
400 : // This now will only drop privileges of the role
401 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
402 0 : client.simple_query(&drop_query)?;
403 0 : Ok(())
404 0 : }
405 :
406 : // Reassign all owned objects in all databases to the owner of the database.
407 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
408 0 : for db in &spec.cluster.databases {
409 0 : if db.owner != *role_name {
410 0 : let mut conf = Config::from_str(connstr)?;
411 0 : conf.dbname(&db.name);
412 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
413 0 : }
414 : }
415 :
416 : // Also handle case when there are no databases in the spec.
417 : // In this case we need to reassign objects in the default database.
418 0 : let conf = Config::from_str(connstr)?;
419 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
420 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
421 :
422 0 : Ok(())
423 0 : }
424 :
425 : /// It follows mostly the same logic as `handle_roles()` excepting that we
426 : /// does not use an explicit transactions block, since major database operations
427 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
428 : /// atomicity should be enough here due to the order of operations and various checks,
429 : /// which together provide us idempotency.
430 0 : #[instrument(skip_all)]
431 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
432 : let existing_dbs = get_existing_dbs(client)?;
433 :
434 : // Print a list of existing Postgres databases (only in debug mode)
435 : if span_enabled!(Level::INFO) {
436 : let mut vec = Vec::new();
437 : for (dbname, db) in &existing_dbs {
438 : vec.push(format!("{}:{}", dbname, db.owner));
439 : }
440 : info!("postgres databases (total {}): {:?}", vec.len(), vec);
441 : }
442 :
443 : // Process delta operations first
444 : if let Some(ops) = &spec.delta_operations {
445 : info!("processing delta operations on databases");
446 : for op in ops {
447 : match op.action.as_ref() {
448 : // We do not check either DB exists or not,
449 : // Postgres will take care of it for us
450 : "delete_db" => {
451 : // In Postgres we can't drop a database if it is a template.
452 : // So we need to unset the template flag first, but it could
453 : // be a retry, so we could've already dropped the database.
454 : // Check that database exists first to make it idempotent.
455 : let unset_template_query: String = format!(
456 : "
457 : DO $$
458 : BEGIN
459 : IF EXISTS(
460 : SELECT 1
461 : FROM pg_catalog.pg_database
462 : WHERE datname = {}
463 : )
464 : THEN
465 : ALTER DATABASE {} is_template false;
466 : END IF;
467 : END
468 : $$;",
469 : escape_literal(&op.name),
470 : &op.name.pg_quote()
471 : );
472 : // Use FORCE to drop database even if there are active connections.
473 : // We run this from `cloud_admin`, so it should have enough privileges.
474 : // NB: there could be other db states, which prevent us from dropping
475 : // the database. For example, if db is used by any active subscription
476 : // or replication slot.
477 : // TODO: deal with it once we allow logical replication. Proper fix should
478 : // involve returning an error code to the control plane, so it could
479 : // figure out that this is a non-retryable error, return it to the user
480 : // and fail operation permanently.
481 : let drop_db_query: String = format!(
482 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
483 : &op.name.pg_quote()
484 : );
485 :
486 : warn!("deleting database '{}'", &op.name);
487 : client.execute(unset_template_query.as_str(), &[])?;
488 : client.execute(drop_db_query.as_str(), &[])?;
489 : }
490 : "rename_db" => {
491 : let new_name = op.new_name.as_ref().unwrap();
492 :
493 : if existing_dbs.contains_key(&op.name) {
494 : let query: String = format!(
495 : "ALTER DATABASE {} RENAME TO {}",
496 : op.name.pg_quote(),
497 : new_name.pg_quote()
498 : );
499 :
500 : warn!("renaming database '{}' to '{}'", op.name, new_name);
501 : client.execute(query.as_str(), &[])?;
502 : }
503 : }
504 : _ => {}
505 : }
506 : }
507 : }
508 :
509 : // Refresh Postgres databases info to handle possible renames
510 : let existing_dbs = get_existing_dbs(client)?;
511 :
512 : info!(
513 : "handling cluster spec databases (total {})",
514 : spec.cluster.databases.len()
515 : );
516 : for db in &spec.cluster.databases {
517 : let name = &db.name;
518 : let pg_db = existing_dbs.get(name);
519 :
520 : enum DatabaseAction {
521 : None,
522 : Update,
523 : Create,
524 : }
525 : let action = if let Some(r) = pg_db {
526 : // XXX: db owner name is returned as quoted string from Postgres,
527 : // when quoting is needed.
528 : let new_owner = if r.owner.starts_with('"') {
529 : db.owner.pg_quote()
530 : } else {
531 : db.owner.clone()
532 : };
533 :
534 : if new_owner != r.owner {
535 : // Update the owner
536 : DatabaseAction::Update
537 : } else {
538 : DatabaseAction::None
539 : }
540 : } else {
541 : DatabaseAction::Create
542 : };
543 :
544 : match action {
545 : DatabaseAction::None => {}
546 : DatabaseAction::Update => {
547 : let query: String = format!(
548 : "ALTER DATABASE {} OWNER TO {}",
549 : name.pg_quote(),
550 : db.owner.pg_quote()
551 : );
552 : let _guard = info_span!("executing", query).entered();
553 : client.execute(query.as_str(), &[])?;
554 : }
555 : DatabaseAction::Create => {
556 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
557 : query.push_str(&db.to_pg_options());
558 : let _guard = info_span!("executing", query).entered();
559 : client.execute(query.as_str(), &[])?;
560 : let grant_query: String = format!(
561 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
562 : name.pg_quote()
563 : );
564 : client.execute(grant_query.as_str(), &[])?;
565 : }
566 : };
567 :
568 : if span_enabled!(Level::INFO) {
569 : let action_str = match action {
570 : DatabaseAction::None => "",
571 : DatabaseAction::Create => " -> create",
572 : DatabaseAction::Update => " -> update",
573 : };
574 : info!(" - {}:{}{}", db.name, db.owner, action_str);
575 : }
576 : }
577 :
578 : Ok(())
579 : }
580 :
581 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
582 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
583 0 : #[instrument(skip_all)]
584 : pub fn handle_grants(
585 : spec: &ComputeSpec,
586 : client: &mut Client,
587 : connstr: &str,
588 : enable_anon_extension: bool,
589 : ) -> Result<()> {
590 : info!("modifying database permissions");
591 : let existing_dbs = get_existing_dbs(client)?;
592 :
593 : // Do some per-database access adjustments. We'd better do this at db creation time,
594 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
595 : // atomically.
596 : for db in &spec.cluster.databases {
597 : match existing_dbs.get(&db.name) {
598 : Some(pg_db) => {
599 : if pg_db.restrict_conn || pg_db.invalid {
600 : info!(
601 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
602 : db.name, pg_db.invalid, pg_db.restrict_conn
603 : );
604 : continue;
605 : }
606 : }
607 : None => {
608 : bail!(
609 : "database {} doesn't exist in Postgres after handle_databases()",
610 : db.name
611 : );
612 : }
613 : }
614 :
615 : let mut conf = Config::from_str(connstr)?;
616 : conf.dbname(&db.name);
617 :
618 : let mut db_client = conf.connect(NoTls)?;
619 :
620 : // This will only change ownership on the schema itself, not the objects
621 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
622 : // and database owner cannot do anything with it. SQL procedure ensures
623 : // that it won't error out if schema `public` doesn't exist.
624 : let alter_query = format!(
625 : "DO $$\n\
626 : DECLARE\n\
627 : schema_owner TEXT;\n\
628 : BEGIN\n\
629 : IF EXISTS(\n\
630 : SELECT nspname\n\
631 : FROM pg_catalog.pg_namespace\n\
632 : WHERE nspname = 'public'\n\
633 : )\n\
634 : THEN\n\
635 : SELECT nspowner::regrole::text\n\
636 : FROM pg_catalog.pg_namespace\n\
637 : WHERE nspname = 'public'\n\
638 : INTO schema_owner;\n\
639 : \n\
640 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
641 : THEN\n\
642 : ALTER SCHEMA public OWNER TO {};\n\
643 : END IF;\n\
644 : END IF;\n\
645 : END\n\
646 : $$;",
647 : db.owner.pg_quote()
648 : );
649 : db_client.simple_query(&alter_query)?;
650 :
651 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
652 : // This is needed because since postgres 15 this privilege is removed by default.
653 : // TODO: web_access isn't created for almost 1 year. It could be that we have
654 : // active users of 1 year old projects, but hopefully not, so check it and
655 : // remove this code if possible. The worst thing that could happen is that
656 : // user won't be able to use public schema in NEW databases created in the
657 : // very OLD project.
658 : //
659 : // Also, alter default permissions so that relations created by extensions can be
660 : // used by neon_superuser without permission issues.
661 : let grant_query = "DO $$\n\
662 : BEGIN\n\
663 : IF EXISTS(\n\
664 : SELECT nspname\n\
665 : FROM pg_catalog.pg_namespace\n\
666 : WHERE nspname = 'public'\n\
667 : ) AND\n\
668 : current_setting('server_version_num')::int/10000 >= 15\n\
669 : THEN\n\
670 : IF EXISTS(\n\
671 : SELECT rolname\n\
672 : FROM pg_catalog.pg_roles\n\
673 : WHERE rolname = 'web_access'\n\
674 : )\n\
675 : THEN\n\
676 : GRANT CREATE ON SCHEMA public TO web_access;\n\
677 : END IF;\n\
678 : END IF;\n\
679 : IF EXISTS(\n\
680 : SELECT nspname\n\
681 : FROM pg_catalog.pg_namespace\n\
682 : WHERE nspname = 'public'\n\
683 : )\n\
684 : THEN\n\
685 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;\n\
686 : ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;\n\
687 : END IF;\n\
688 : END\n\
689 : $$;"
690 : .to_string();
691 :
692 : info!(
693 : "grant query for db {} : {}",
694 : &db.name,
695 : inlinify(&grant_query)
696 : );
697 : db_client.simple_query(&grant_query)?;
698 :
699 : // it is important to run this after all grants
700 : if enable_anon_extension {
701 : handle_extension_anon(spec, &db.owner, &mut db_client, false)
702 : .context("handle_grants handle_extension_anon")?;
703 : }
704 : }
705 :
706 : Ok(())
707 : }
708 :
709 : /// Create required system extensions
710 0 : #[instrument(skip_all)]
711 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
712 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
713 : if libs.contains("pg_stat_statements") {
714 : // Create extension only if this compute really needs it
715 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
716 : info!("creating system extensions with query: {}", query);
717 : client.simple_query(query)?;
718 : }
719 : }
720 :
721 : Ok(())
722 : }
723 :
724 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
725 0 : #[instrument(skip_all)]
726 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
727 : info!("handle extension neon");
728 :
729 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
730 : client.simple_query(query)?;
731 :
732 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
733 : info!("create neon extension with query: {}", query);
734 : client.simple_query(query)?;
735 :
736 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
737 : client.simple_query(query)?;
738 :
739 : query = "ALTER EXTENSION neon SET SCHEMA neon";
740 : info!("alter neon extension schema with query: {}", query);
741 : client.simple_query(query)?;
742 :
743 : // this will be a no-op if extension is already up to date,
744 : // which may happen in two cases:
745 : // - extension was just installed
746 : // - extension was already installed and is up to date
747 : let query = "ALTER EXTENSION neon UPDATE";
748 : info!("update neon extension version with query: {}", query);
749 : if let Err(e) = client.simple_query(query) {
750 : error!(
751 : "failed to upgrade neon extension during `handle_extension_neon`: {}",
752 : e
753 : );
754 : }
755 :
756 : Ok(())
757 : }
758 :
759 0 : #[instrument(skip_all)]
760 : pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
761 : info!("handle neon extension upgrade");
762 : let query = "ALTER EXTENSION neon UPDATE";
763 : info!("update neon extension version with query: {}", query);
764 : client.simple_query(query)?;
765 :
766 : Ok(())
767 : }
768 :
769 0 : #[instrument(skip_all)]
770 : pub fn handle_migrations(client: &mut Client) -> Result<()> {
771 : info!("handle migrations");
772 :
773 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
774 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
775 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
776 :
777 : // Add new migrations in numerical order.
778 : let migrations = [
779 : include_str!("./migrations/0000-neon_superuser_bypass_rls.sql"),
780 : include_str!("./migrations/0001-alter_roles.sql"),
781 : include_str!("./migrations/0002-grant_pg_create_subscription_to_neon_superuser.sql"),
782 : include_str!("./migrations/0003-grant_pg_monitor_to_neon_superuser.sql"),
783 : include_str!("./migrations/0004-grant_all_on_tables_to_neon_superuser.sql"),
784 : include_str!("./migrations/0005-grant_all_on_sequences_to_neon_superuser.sql"),
785 : include_str!(
786 : "./migrations/0006-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
787 : ),
788 : include_str!(
789 : "./migrations/0007-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
790 : ),
791 : include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"),
792 : ];
793 :
794 0 : let mut func = || {
795 0 : let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
796 0 : client.simple_query(query)?;
797 :
798 0 : let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
799 0 : client.simple_query(query)?;
800 :
801 0 : let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
802 0 : client.simple_query(query)?;
803 :
804 0 : let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
805 0 : client.simple_query(query)?;
806 :
807 0 : let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
808 0 : client.simple_query(query)?;
809 0 : Ok::<_, anyhow::Error>(())
810 0 : };
811 : func().context("handle_migrations prepare")?;
812 :
813 : let query = "SELECT id FROM neon_migration.migration_id";
814 : let row = client
815 : .query_one(query, &[])
816 : .context("handle_migrations get migration_id")?;
817 : let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
818 : let starting_migration_id = current_migration;
819 :
820 : let query = "BEGIN";
821 : client
822 : .simple_query(query)
823 : .context("handle_migrations begin")?;
824 :
825 : while current_migration < migrations.len() {
826 : let migration = &migrations[current_migration];
827 : if migration.starts_with("-- SKIP") {
828 : info!("Skipping migration id={}", current_migration);
829 : } else {
830 : info!(
831 : "Running migration id={}:\n{}\n",
832 : current_migration, migration
833 : );
834 0 : client.simple_query(migration).with_context(|| {
835 0 : format!("handle_migrations current_migration={}", current_migration)
836 0 : })?;
837 : }
838 : current_migration += 1;
839 : }
840 : let setval = format!(
841 : "UPDATE neon_migration.migration_id SET id={}",
842 : migrations.len()
843 : );
844 : client
845 : .simple_query(&setval)
846 : .context("handle_migrations update id")?;
847 :
848 : let query = "COMMIT";
849 : client
850 : .simple_query(query)
851 : .context("handle_migrations commit")?;
852 :
853 : info!(
854 : "Ran {} migrations",
855 : (migrations.len() - starting_migration_id)
856 : );
857 :
858 : Ok(())
859 : }
860 :
861 : /// Connect to the database as superuser and pre-create anon extension
862 : /// if it is present in shared_preload_libraries
863 0 : #[instrument(skip_all)]
864 : pub fn handle_extension_anon(
865 : spec: &ComputeSpec,
866 : db_owner: &str,
867 : db_client: &mut Client,
868 : grants_only: bool,
869 : ) -> Result<()> {
870 : info!("handle extension anon");
871 :
872 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
873 : if libs.contains("anon") {
874 : if !grants_only {
875 : // check if extension is already initialized using anon.is_initialized()
876 : let query = "SELECT anon.is_initialized()";
877 : match db_client.query(query, &[]) {
878 : Ok(rows) => {
879 : if !rows.is_empty() {
880 : let is_initialized: bool = rows[0].get(0);
881 : if is_initialized {
882 : info!("anon extension is already initialized");
883 : return Ok(());
884 : }
885 : }
886 : }
887 : Err(e) => {
888 : warn!(
889 : "anon extension is_installed check failed with expected error: {}",
890 : e
891 : );
892 : }
893 : };
894 :
895 : // Create anon extension if this compute needs it
896 : // Users cannot create it themselves, because superuser is required.
897 : let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
898 : info!("creating anon extension with query: {}", query);
899 : match db_client.query(query, &[]) {
900 : Ok(_) => {}
901 : Err(e) => {
902 : error!("anon extension creation failed with error: {}", e);
903 : return Ok(());
904 : }
905 : }
906 :
907 : // check that extension is installed
908 : query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
909 : let rows = db_client.query(query, &[])?;
910 : if rows.is_empty() {
911 : error!("anon extension is not installed");
912 : return Ok(());
913 : }
914 :
915 : // Initialize anon extension
916 : // This also requires superuser privileges, so users cannot do it themselves.
917 : query = "SELECT anon.init()";
918 : match db_client.query(query, &[]) {
919 : Ok(_) => {}
920 : Err(e) => {
921 : error!("anon.init() failed with error: {}", e);
922 : return Ok(());
923 : }
924 : }
925 : }
926 :
927 : // check that extension is installed, if not bail early
928 : let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
929 : match db_client.query(query, &[]) {
930 : Ok(rows) => {
931 : if rows.is_empty() {
932 : error!("anon extension is not installed");
933 : return Ok(());
934 : }
935 : }
936 : Err(e) => {
937 : error!("anon extension check failed with error: {}", e);
938 : return Ok(());
939 : }
940 : };
941 :
942 : let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
943 : info!("granting anon extension permissions with query: {}", query);
944 : db_client.simple_query(&query)?;
945 :
946 : // Grant permissions to db_owner to use anon extension functions
947 : let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
948 : info!("granting anon extension permissions with query: {}", query);
949 : db_client.simple_query(&query)?;
950 :
951 : // This is needed, because some functions are defined as SECURITY DEFINER.
952 : // In Postgres SECURITY DEFINER functions are executed with the privileges
953 : // of the owner.
954 : // In anon extension this it is needed to access some GUCs, which are only accessible to
955 : // superuser. But we've patched postgres to allow db_owner to access them as well.
956 : // So we need to change owner of these functions to db_owner.
957 : let query = format!("
958 : SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
959 : from pg_proc p
960 : join pg_namespace nsp ON p.pronamespace = nsp.oid
961 : where nsp.nspname = 'anon';", db_owner);
962 :
963 : info!("change anon extension functions owner to db owner");
964 : db_client.simple_query(&query)?;
965 :
966 : // affects views as well
967 : let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
968 : info!("granting anon extension permissions with query: {}", query);
969 : db_client.simple_query(&query)?;
970 :
971 : let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
972 : info!("granting anon extension permissions with query: {}", query);
973 : db_client.simple_query(&query)?;
974 : }
975 : }
976 :
977 : Ok(())
978 : }
|