Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::logger::inlinify;
13 : use crate::params::PG_HBA_ALL_MD5;
14 : use crate::pg_helpers::*;
15 :
16 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
17 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
18 :
19 : // Do control plane request and return response if any. In case of error it
20 : // returns a bool flag indicating whether it makes sense to retry the request
21 : // and a string with error message.
22 0 : fn do_control_plane_request(
23 0 : uri: &str,
24 0 : jwt: &str,
25 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
26 0 : let resp = reqwest::blocking::Client::new()
27 0 : .get(uri)
28 0 : .header("Authorization", format!("Bearer {}", jwt))
29 0 : .send()
30 0 : .map_err(|e| {
31 0 : (
32 0 : true,
33 0 : format!("could not perform spec request to control plane: {}", e),
34 0 : )
35 0 : })?;
36 :
37 0 : match resp.status() {
38 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
39 0 : Ok(spec_resp) => Ok(spec_resp),
40 0 : Err(e) => Err((
41 0 : true,
42 0 : format!("could not deserialize control plane response: {}", e),
43 0 : )),
44 : },
45 : StatusCode::SERVICE_UNAVAILABLE => {
46 0 : Err((true, "control plane is temporarily unavailable".to_string()))
47 : }
48 : StatusCode::BAD_GATEWAY => {
49 : // We have a problem with intermittent 502 errors now
50 : // https://github.com/neondatabase/cloud/issues/2353
51 : // It's fine to retry GET request in this case.
52 0 : Err((true, "control plane request failed with 502".to_string()))
53 : }
54 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
55 : // or some internal failure happened. Doesn't make much sense to retry in this case.
56 0 : _ => Err((
57 0 : false,
58 0 : format!(
59 0 : "unexpected control plane response status code: {}",
60 0 : resp.status()
61 0 : ),
62 0 : )),
63 : }
64 0 : }
65 :
66 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
67 : /// env variable is set, it will be used for authorization.
68 0 : pub fn get_spec_from_control_plane(
69 0 : base_uri: &str,
70 0 : compute_id: &str,
71 0 : ) -> Result<Option<ComputeSpec>> {
72 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
73 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
74 0 : Ok(v) => v,
75 0 : Err(_) => "".to_string(),
76 : };
77 0 : let mut attempt = 1;
78 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
79 0 :
80 0 : info!("getting spec from control plane: {}", cp_uri);
81 :
82 : // Do 3 attempts to get spec from the control plane using the following logic:
83 : // - network error -> then retry
84 : // - compute id is unknown or any other error -> bail out
85 : // - no spec for compute yet (Empty state) -> return Ok(None)
86 : // - got spec -> return Ok(Some(spec))
87 0 : while attempt < 4 {
88 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
89 0 : Ok(spec_resp) => match spec_resp.status {
90 0 : ControlPlaneComputeStatus::Empty => Ok(None),
91 : ControlPlaneComputeStatus::Attached => {
92 0 : if let Some(spec) = spec_resp.spec {
93 0 : Ok(Some(spec))
94 : } else {
95 0 : bail!("compute is attached, but spec is empty")
96 : }
97 : }
98 : },
99 0 : Err((retry, msg)) => {
100 0 : if retry {
101 0 : Err(anyhow!(msg))
102 : } else {
103 0 : bail!(msg);
104 : }
105 : }
106 : };
107 :
108 0 : if let Err(e) = &spec {
109 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
110 : } else {
111 0 : return spec;
112 : }
113 :
114 0 : attempt += 1;
115 0 : std::thread::sleep(std::time::Duration::from_millis(100));
116 : }
117 :
118 : // All attempts failed, return error.
119 0 : spec
120 0 : }
121 :
122 : /// Check `pg_hba.conf` and update if needed to allow external connections.
123 571 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
124 571 : // XXX: consider making it a part of spec.json
125 571 : info!("checking pg_hba.conf");
126 571 : let pghba_path = pgdata_path.join("pg_hba.conf");
127 571 :
128 571 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
129 571 : info!("updated pg_hba.conf to allow external connections");
130 : } else {
131 0 : info!("pg_hba.conf is up-to-date");
132 : }
133 :
134 571 : Ok(())
135 571 : }
136 :
137 : /// Create a standby.signal file
138 46 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
139 46 : // XXX: consider making it a part of spec.json
140 46 : info!("adding standby.signal");
141 46 : let signalfile = pgdata_path.join("standby.signal");
142 46 :
143 46 : if !signalfile.exists() {
144 46 : info!("created standby.signal");
145 46 : File::create(signalfile)?;
146 : } else {
147 0 : info!("reused pre-existing standby.signal");
148 : }
149 46 : Ok(())
150 46 : }
151 :
152 : /// Compute could be unexpectedly shut down, for example, during the
153 : /// database dropping. This leaves the database in the invalid state,
154 : /// which prevents new db creation with the same name. This function
155 : /// will clean it up before proceeding with catalog updates. All
156 : /// possible future cleanup operations may go here too.
157 239 : #[instrument(skip_all)]
158 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
159 : let existing_dbs = get_existing_dbs(client)?;
160 :
161 : for (_, db) in existing_dbs {
162 : if db.invalid {
163 : // After recent commit in Postgres, interrupted DROP DATABASE
164 : // leaves the database in the invalid state. According to the
165 : // commit message, the only option for user is to drop it again.
166 : // See:
167 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
168 : //
169 : // Postgres Neon extension is done the way, that db is de-registered
170 : // in the control plane metadata only after it is dropped. So there is
171 : // a chance that it still thinks that db should exist. This means
172 : // that it will be re-created by `handle_databases()`. Yet, it's fine
173 : // as user can just repeat drop (in vanilla Postgres they would need
174 : // to do the same, btw).
175 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
176 1 : info!("dropping invalid database {}", db.name);
177 : client.execute(query.as_str(), &[])?;
178 : }
179 : }
180 :
181 : Ok(())
182 : }
183 :
184 : /// Given a cluster spec json and open transaction it handles roles creation,
185 : /// deletion and update.
186 239 : #[instrument(skip_all)]
187 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
188 : let mut xact = client.transaction()?;
189 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
190 :
191 : // Print a list of existing Postgres roles (only in debug mode)
192 239 : if span_enabled!(Level::INFO) {
193 : let mut vec = Vec::new();
194 : for r in &existing_roles {
195 : vec.push(format!(
196 : "{}:{}",
197 : r.name,
198 : if r.encrypted_password.is_some() {
199 : "[FILTERED]"
200 : } else {
201 : "(null)"
202 : }
203 : ));
204 : }
205 :
206 239 : info!("postgres roles (total {}): {:?}", vec.len(), vec);
207 : }
208 :
209 : // Process delta operations first
210 : if let Some(ops) = &spec.delta_operations {
211 0 : info!("processing role renames");
212 : for op in ops {
213 : match op.action.as_ref() {
214 : "delete_role" => {
215 : // no-op now, roles will be deleted at the end of configuration
216 : }
217 : // Renaming role drops its password, since role name is
218 : // used as a salt there. It is important that this role
219 : // is recorded with a new `name` in the `roles` list.
220 : // Follow up roles update will set the new password.
221 : "rename_role" => {
222 : let new_name = op.new_name.as_ref().unwrap();
223 :
224 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
225 0 : if existing_roles.iter().any(|r| r.name == op.name) {
226 : let query: String = format!(
227 : "ALTER ROLE {} RENAME TO {}",
228 : op.name.pg_quote(),
229 : new_name.pg_quote()
230 : );
231 :
232 0 : warn!("renaming role '{}' to '{}'", op.name, new_name);
233 : xact.execute(query.as_str(), &[])?;
234 : }
235 : }
236 : _ => {}
237 : }
238 : }
239 : }
240 :
241 : // Refresh Postgres roles info to handle possible roles renaming
242 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
243 :
244 239 : info!(
245 239 : "handling cluster spec roles (total {})",
246 239 : spec.cluster.roles.len()
247 239 : );
248 : for role in &spec.cluster.roles {
249 : let name = &role.name;
250 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
251 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
252 :
253 : enum RoleAction {
254 : None,
255 : Update,
256 : Create,
257 : }
258 : let action = if let Some(r) = pg_role {
259 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
260 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
261 : {
262 : RoleAction::Update
263 : } else if let Some(pg_pwd) = &r.encrypted_password {
264 : // Check whether password changed or not (trim 'md5' prefix first if any)
265 : //
266 : // This is a backward compatibility hack, which comes from the times when we were using
267 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
268 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
269 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
270 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
271 : // to place this compatibility layer.
272 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
273 : stripped
274 : } else {
275 : pg_pwd
276 : };
277 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
278 : RoleAction::Update
279 : } else {
280 : RoleAction::None
281 : }
282 : } else {
283 : RoleAction::None
284 : }
285 : } else {
286 : RoleAction::Create
287 : };
288 :
289 : match action {
290 : RoleAction::None => {}
291 : RoleAction::Update => {
292 : // This can be run on /every/ role! Not just ones created through the console.
293 : // This means that if you add some funny ALTER here that adds a permission,
294 : // this will get run even on user-created roles! This will result in different
295 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
296 : // now only grants LOGIN and changes the password. Please do not allow this branch
297 : // to do anything silly.
298 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
299 : query.push_str(&role.to_pg_options());
300 : xact.execute(query.as_str(), &[])?;
301 : }
302 : RoleAction::Create => {
303 : // This branch only runs when roles are created through the console, so it is
304 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
305 : // from neon_superuser.
306 : let mut query: String = format!(
307 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
308 : name.pg_quote()
309 : );
310 0 : info!("running role create query: '{}'", &query);
311 : query.push_str(&role.to_pg_options());
312 : xact.execute(query.as_str(), &[])?;
313 : }
314 : }
315 :
316 0 : if span_enabled!(Level::INFO) {
317 : let pwd = if role.encrypted_password.is_some() {
318 : "[FILTERED]"
319 : } else {
320 : "(null)"
321 : };
322 : let action_str = match action {
323 : RoleAction::None => "",
324 : RoleAction::Create => " -> create",
325 : RoleAction::Update => " -> update",
326 : };
327 0 : info!(" - {}:{}{}", name, pwd, action_str);
328 : }
329 : }
330 :
331 : xact.commit()?;
332 :
333 : Ok(())
334 : }
335 :
336 : /// Reassign all dependent objects and delete requested roles.
337 239 : #[instrument(skip_all)]
338 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
339 : if let Some(ops) = &spec.delta_operations {
340 : // First, reassign all dependent objects to db owners.
341 0 : info!("reassigning dependent objects of to-be-deleted roles");
342 :
343 : // Fetch existing roles. We could've exported and used `existing_roles` from
344 : // `handle_roles()`, but we only make this list there before creating new roles.
345 : // Which is probably fine as we never create to-be-deleted roles, but that'd
346 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
347 : // buffers already, so this shouldn't be a big deal.
348 : let mut xact = client.transaction()?;
349 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
350 : xact.commit()?;
351 :
352 : for op in ops {
353 : // Check that role is still present in Postgres, as this could be a
354 : // restart with the same spec after role deletion.
355 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
356 : reassign_owned_objects(spec, connstr, &op.name)?;
357 : }
358 : }
359 :
360 : // Second, proceed with role deletions.
361 0 : info!("processing role deletions");
362 : let mut xact = client.transaction()?;
363 : for op in ops {
364 : // We do not check either role exists or not,
365 : // Postgres will take care of it for us
366 : if op.action == "delete_role" {
367 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
368 :
369 0 : warn!("deleting role '{}'", &op.name);
370 : xact.execute(query.as_str(), &[])?;
371 : }
372 : }
373 : xact.commit()?;
374 : }
375 :
376 : Ok(())
377 : }
378 :
379 0 : fn reassign_owned_objects_in_one_db(
380 0 : conf: Config,
381 0 : role_name: &PgIdent,
382 0 : db_owner: &PgIdent,
383 0 : ) -> Result<()> {
384 0 : let mut client = conf.connect(NoTls)?;
385 :
386 : // This will reassign all dependent objects to the db owner
387 0 : let reassign_query = format!(
388 0 : "REASSIGN OWNED BY {} TO {}",
389 0 : role_name.pg_quote(),
390 0 : db_owner.pg_quote()
391 0 : );
392 0 : info!(
393 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
394 0 : role_name,
395 0 : conf.get_dbname().unwrap_or(""),
396 0 : db_owner
397 0 : );
398 0 : client.simple_query(&reassign_query)?;
399 :
400 : // This now will only drop privileges of the role
401 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
402 0 : client.simple_query(&drop_query)?;
403 0 : Ok(())
404 0 : }
405 :
406 : // Reassign all owned objects in all databases to the owner of the database.
407 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
408 0 : for db in &spec.cluster.databases {
409 0 : if db.owner != *role_name {
410 0 : let mut conf = Config::from_str(connstr)?;
411 0 : conf.dbname(&db.name);
412 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
413 0 : }
414 : }
415 :
416 : // Also handle case when there are no databases in the spec.
417 : // In this case we need to reassign objects in the default database.
418 0 : let conf = Config::from_str(connstr)?;
419 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
420 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
421 :
422 0 : Ok(())
423 0 : }
424 :
425 : /// It follows mostly the same logic as `handle_roles()` excepting that we
426 : /// does not use an explicit transactions block, since major database operations
427 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
428 : /// atomicity should be enough here due to the order of operations and various checks,
429 : /// which together provide us idempotency.
430 239 : #[instrument(skip_all)]
431 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
432 : let existing_dbs = get_existing_dbs(client)?;
433 :
434 : // Print a list of existing Postgres databases (only in debug mode)
435 239 : if span_enabled!(Level::INFO) {
436 : let mut vec = Vec::new();
437 : for (dbname, db) in &existing_dbs {
438 : vec.push(format!("{}:{}", dbname, db.owner));
439 : }
440 239 : info!("postgres databases (total {}): {:?}", vec.len(), vec);
441 : }
442 :
443 : // Process delta operations first
444 : if let Some(ops) = &spec.delta_operations {
445 0 : info!("processing delta operations on databases");
446 : for op in ops {
447 : match op.action.as_ref() {
448 : // We do not check either DB exists or not,
449 : // Postgres will take care of it for us
450 : "delete_db" => {
451 : // In Postgres we can't drop a database if it is a template.
452 : // So we need to unset the template flag first, but it could
453 : // be a retry, so we could've already dropped the database.
454 : // Check that database exists first to make it idempotent.
455 : let unset_template_query: String = format!(
456 : "
457 : DO $$
458 : BEGIN
459 : IF EXISTS(
460 : SELECT 1
461 : FROM pg_catalog.pg_database
462 : WHERE datname = {}
463 : )
464 : THEN
465 : ALTER DATABASE {} is_template false;
466 : END IF;
467 : END
468 : $$;",
469 : escape_literal(&op.name),
470 : &op.name.pg_quote()
471 : );
472 : // Use FORCE to drop database even if there are active connections.
473 : // We run this from `cloud_admin`, so it should have enough privileges.
474 : // NB: there could be other db states, which prevent us from dropping
475 : // the database. For example, if db is used by any active subscription
476 : // or replication slot.
477 : // TODO: deal with it once we allow logical replication. Proper fix should
478 : // involve returning an error code to the control plane, so it could
479 : // figure out that this is a non-retryable error, return it to the user
480 : // and fail operation permanently.
481 : let drop_db_query: String = format!(
482 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
483 : &op.name.pg_quote()
484 : );
485 :
486 0 : warn!("deleting database '{}'", &op.name);
487 : client.execute(unset_template_query.as_str(), &[])?;
488 : client.execute(drop_db_query.as_str(), &[])?;
489 : }
490 : "rename_db" => {
491 : let new_name = op.new_name.as_ref().unwrap();
492 :
493 : if existing_dbs.get(&op.name).is_some() {
494 : let query: String = format!(
495 : "ALTER DATABASE {} RENAME TO {}",
496 : op.name.pg_quote(),
497 : new_name.pg_quote()
498 : );
499 :
500 0 : warn!("renaming database '{}' to '{}'", op.name, new_name);
501 : client.execute(query.as_str(), &[])?;
502 : }
503 : }
504 : _ => {}
505 : }
506 : }
507 : }
508 :
509 : // Refresh Postgres databases info to handle possible renames
510 : let existing_dbs = get_existing_dbs(client)?;
511 :
512 239 : info!(
513 239 : "handling cluster spec databases (total {})",
514 239 : spec.cluster.databases.len()
515 239 : );
516 : for db in &spec.cluster.databases {
517 : let name = &db.name;
518 : let pg_db = existing_dbs.get(name);
519 :
520 : enum DatabaseAction {
521 : None,
522 : Update,
523 : Create,
524 : }
525 : let action = if let Some(r) = pg_db {
526 : // XXX: db owner name is returned as quoted string from Postgres,
527 : // when quoting is needed.
528 : let new_owner = if r.owner.starts_with('"') {
529 : db.owner.pg_quote()
530 : } else {
531 : db.owner.clone()
532 : };
533 :
534 : if new_owner != r.owner {
535 : // Update the owner
536 : DatabaseAction::Update
537 : } else {
538 : DatabaseAction::None
539 : }
540 : } else {
541 : DatabaseAction::Create
542 : };
543 :
544 : match action {
545 : DatabaseAction::None => {}
546 : DatabaseAction::Update => {
547 : let query: String = format!(
548 : "ALTER DATABASE {} OWNER TO {}",
549 : name.pg_quote(),
550 : db.owner.pg_quote()
551 : );
552 : let _guard = info_span!("executing", query).entered();
553 : client.execute(query.as_str(), &[])?;
554 : }
555 : DatabaseAction::Create => {
556 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
557 : query.push_str(&db.to_pg_options());
558 : let _guard = info_span!("executing", query).entered();
559 : client.execute(query.as_str(), &[])?;
560 : let grant_query: String = format!(
561 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
562 : name.pg_quote()
563 : );
564 : client.execute(grant_query.as_str(), &[])?;
565 : }
566 : };
567 :
568 0 : if span_enabled!(Level::INFO) {
569 : let action_str = match action {
570 : DatabaseAction::None => "",
571 : DatabaseAction::Create => " -> create",
572 : DatabaseAction::Update => " -> update",
573 : };
574 0 : info!(" - {}:{}{}", db.name, db.owner, action_str);
575 : }
576 : }
577 :
578 : Ok(())
579 : }
580 :
581 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
582 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
583 239 : #[instrument(skip_all)]
584 : pub fn handle_grants(
585 : spec: &ComputeSpec,
586 : client: &mut Client,
587 : connstr: &str,
588 : enable_anon_extension: bool,
589 : ) -> Result<()> {
590 239 : info!("modifying database permissions");
591 : let existing_dbs = get_existing_dbs(client)?;
592 :
593 : // Do some per-database access adjustments. We'd better do this at db creation time,
594 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
595 : // atomically.
596 : for db in &spec.cluster.databases {
597 : match existing_dbs.get(&db.name) {
598 : Some(pg_db) => {
599 : if pg_db.restrict_conn || pg_db.invalid {
600 0 : info!(
601 0 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
602 0 : db.name, pg_db.invalid, pg_db.restrict_conn
603 0 : );
604 : continue;
605 : }
606 : }
607 : None => {
608 : bail!(
609 : "database {} doesn't exist in Postgres after handle_databases()",
610 : db.name
611 : );
612 : }
613 : }
614 :
615 : let mut conf = Config::from_str(connstr)?;
616 : conf.dbname(&db.name);
617 :
618 : let mut db_client = conf.connect(NoTls)?;
619 :
620 : // This will only change ownership on the schema itself, not the objects
621 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
622 : // and database owner cannot do anything with it. SQL procedure ensures
623 : // that it won't error out if schema `public` doesn't exist.
624 : let alter_query = format!(
625 : "DO $$\n\
626 : DECLARE\n\
627 : schema_owner TEXT;\n\
628 : BEGIN\n\
629 : IF EXISTS(\n\
630 : SELECT nspname\n\
631 : FROM pg_catalog.pg_namespace\n\
632 : WHERE nspname = 'public'\n\
633 : )\n\
634 : THEN\n\
635 : SELECT nspowner::regrole::text\n\
636 : FROM pg_catalog.pg_namespace\n\
637 : WHERE nspname = 'public'\n\
638 : INTO schema_owner;\n\
639 : \n\
640 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
641 : THEN\n\
642 : ALTER SCHEMA public OWNER TO {};\n\
643 : END IF;\n\
644 : END IF;\n\
645 : END\n\
646 : $$;",
647 : db.owner.pg_quote()
648 : );
649 : db_client.simple_query(&alter_query)?;
650 :
651 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
652 : // This is needed because since postgres 15 this privilege is removed by default.
653 : // TODO: web_access isn't created for almost 1 year. It could be that we have
654 : // active users of 1 year old projects, but hopefully not, so check it and
655 : // remove this code if possible. The worst thing that could happen is that
656 : // user won't be able to use public schema in NEW databases created in the
657 : // very OLD project.
658 : let grant_query = "DO $$\n\
659 : BEGIN\n\
660 : IF EXISTS(\n\
661 : SELECT nspname\n\
662 : FROM pg_catalog.pg_namespace\n\
663 : WHERE nspname = 'public'\n\
664 : ) AND\n\
665 : current_setting('server_version_num')::int/10000 >= 15\n\
666 : THEN\n\
667 : IF EXISTS(\n\
668 : SELECT rolname\n\
669 : FROM pg_catalog.pg_roles\n\
670 : WHERE rolname = 'web_access'\n\
671 : )\n\
672 : THEN\n\
673 : GRANT CREATE ON SCHEMA public TO web_access;\n\
674 : END IF;\n\
675 : END IF;\n\
676 : END\n\
677 : $$;"
678 : .to_string();
679 :
680 0 : info!(
681 0 : "grant query for db {} : {}",
682 0 : &db.name,
683 0 : inlinify(&grant_query)
684 0 : );
685 : db_client.simple_query(&grant_query)?;
686 :
687 : // it is important to run this after all grants
688 : if enable_anon_extension {
689 : handle_extension_anon(spec, &db.owner, &mut db_client, false)?;
690 : }
691 : }
692 :
693 : Ok(())
694 : }
695 :
696 : /// Create required system extensions
697 239 : #[instrument(skip_all)]
698 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
699 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
700 : if libs.contains("pg_stat_statements") {
701 : // Create extension only if this compute really needs it
702 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
703 0 : info!("creating system extensions with query: {}", query);
704 : client.simple_query(query)?;
705 : }
706 : }
707 :
708 : Ok(())
709 : }
710 :
711 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
712 239 : #[instrument(skip_all)]
713 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
714 239 : info!("handle extension neon");
715 :
716 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
717 : client.simple_query(query)?;
718 :
719 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
720 239 : info!("create neon extension with query: {}", query);
721 : client.simple_query(query)?;
722 :
723 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
724 : client.simple_query(query)?;
725 :
726 : query = "ALTER EXTENSION neon SET SCHEMA neon";
727 239 : info!("alter neon extension schema with query: {}", query);
728 : client.simple_query(query)?;
729 :
730 : // this will be a no-op if extension is already up to date,
731 : // which may happen in two cases:
732 : // - extension was just installed
733 : // - extension was already installed and is up to date
734 : let query = "ALTER EXTENSION neon UPDATE";
735 239 : info!("update neon extension schema with query: {}", query);
736 : client.simple_query(query)?;
737 :
738 : Ok(())
739 : }
740 :
741 8 : #[instrument(skip_all)]
742 : pub fn handle_migrations(client: &mut Client) -> Result<()> {
743 8 : info!("handle migrations");
744 :
745 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
746 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
747 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
748 :
749 : let migrations = [
750 : "ALTER ROLE neon_superuser BYPASSRLS",
751 : r#"
752 : DO $$
753 : DECLARE
754 : role_name text;
755 : BEGIN
756 : FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
757 : LOOP
758 : RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
759 : EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
760 : END LOOP;
761 :
762 : FOR role_name IN SELECT rolname FROM pg_roles
763 : WHERE
764 : NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
765 : LOOP
766 : RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
767 : EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
768 : END LOOP;
769 : END $$;
770 : "#,
771 : r#"
772 : DO $$
773 : BEGIN
774 : IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
775 : EXECUTE 'GRANT pg_create_subscription TO neon_superuser';
776 : END IF;
777 : END
778 : $$;"#,
779 : "GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION",
780 : ];
781 :
782 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
783 : client.simple_query(query)?;
784 :
785 : query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
786 : client.simple_query(query)?;
787 :
788 : query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
789 : client.simple_query(query)?;
790 :
791 : query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
792 : client.simple_query(query)?;
793 :
794 : query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
795 : client.simple_query(query)?;
796 :
797 : query = "SELECT id FROM neon_migration.migration_id";
798 : let row = client.query_one(query, &[])?;
799 : let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
800 : let starting_migration_id = current_migration;
801 :
802 : query = "BEGIN";
803 : client.simple_query(query)?;
804 :
805 : while current_migration < migrations.len() {
806 24 : info!("Running migration:\n{}\n", migrations[current_migration]);
807 : client.simple_query(migrations[current_migration])?;
808 : current_migration += 1;
809 : }
810 : let setval = format!(
811 : "UPDATE neon_migration.migration_id SET id={}",
812 : migrations.len()
813 : );
814 : client.simple_query(&setval)?;
815 :
816 : query = "COMMIT";
817 : client.simple_query(query)?;
818 :
819 6 : info!(
820 6 : "Ran {} migrations",
821 6 : (migrations.len() - starting_migration_id)
822 6 : );
823 :
824 : Ok(())
825 : }
826 :
827 : /// Connect to the database as superuser and pre-create anon extension
828 : /// if it is present in shared_preload_libraries
829 0 : #[instrument(skip_all)]
830 : pub fn handle_extension_anon(
831 : spec: &ComputeSpec,
832 : db_owner: &str,
833 : db_client: &mut Client,
834 : grants_only: bool,
835 : ) -> Result<()> {
836 0 : info!("handle extension anon");
837 :
838 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
839 : if libs.contains("anon") {
840 : if !grants_only {
841 : // check if extension is already initialized using anon.is_initialized()
842 : let query = "SELECT anon.is_initialized()";
843 : match db_client.query(query, &[]) {
844 : Ok(rows) => {
845 : if !rows.is_empty() {
846 : let is_initialized: bool = rows[0].get(0);
847 : if is_initialized {
848 0 : info!("anon extension is already initialized");
849 : return Ok(());
850 : }
851 : }
852 : }
853 : Err(e) => {
854 0 : warn!(
855 0 : "anon extension is_installed check failed with expected error: {}",
856 0 : e
857 0 : );
858 : }
859 : };
860 :
861 : // Create anon extension if this compute needs it
862 : // Users cannot create it themselves, because superuser is required.
863 : let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
864 0 : info!("creating anon extension with query: {}", query);
865 : match db_client.query(query, &[]) {
866 : Ok(_) => {}
867 : Err(e) => {
868 0 : error!("anon extension creation failed with error: {}", e);
869 : return Ok(());
870 : }
871 : }
872 :
873 : // check that extension is installed
874 : query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
875 : let rows = db_client.query(query, &[])?;
876 : if rows.is_empty() {
877 0 : error!("anon extension is not installed");
878 : return Ok(());
879 : }
880 :
881 : // Initialize anon extension
882 : // This also requires superuser privileges, so users cannot do it themselves.
883 : query = "SELECT anon.init()";
884 : match db_client.query(query, &[]) {
885 : Ok(_) => {}
886 : Err(e) => {
887 0 : error!("anon.init() failed with error: {}", e);
888 : return Ok(());
889 : }
890 : }
891 : }
892 :
893 : // check that extension is installed, if not bail early
894 : let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
895 : match db_client.query(query, &[]) {
896 : Ok(rows) => {
897 : if rows.is_empty() {
898 0 : error!("anon extension is not installed");
899 : return Ok(());
900 : }
901 : }
902 : Err(e) => {
903 0 : error!("anon extension check failed with error: {}", e);
904 : return Ok(());
905 : }
906 : };
907 :
908 : let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
909 0 : info!("granting anon extension permissions with query: {}", query);
910 : db_client.simple_query(&query)?;
911 :
912 : // Grant permissions to db_owner to use anon extension functions
913 : let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
914 0 : info!("granting anon extension permissions with query: {}", query);
915 : db_client.simple_query(&query)?;
916 :
917 : // This is needed, because some functions are defined as SECURITY DEFINER.
918 : // In Postgres SECURITY DEFINER functions are executed with the privileges
919 : // of the owner.
920 : // In anon extension this it is needed to access some GUCs, which are only accessible to
921 : // superuser. But we've patched postgres to allow db_owner to access them as well.
922 : // So we need to change owner of these functions to db_owner.
923 : let query = format!("
924 : SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
925 : from pg_proc p
926 : join pg_namespace nsp ON p.pronamespace = nsp.oid
927 : where nsp.nspname = 'anon';", db_owner);
928 :
929 0 : info!("change anon extension functions owner to db owner");
930 : db_client.simple_query(&query)?;
931 :
932 : // affects views as well
933 : let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
934 0 : info!("granting anon extension permissions with query: {}", query);
935 : db_client.simple_query(&query)?;
936 :
937 : let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
938 0 : info!("granting anon extension permissions with query: {}", query);
939 : db_client.simple_query(&query)?;
940 : }
941 : }
942 :
943 : Ok(())
944 : }
|