TLA Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::params::PG_HBA_ALL_MD5;
13 : use crate::pg_helpers::*;
14 :
15 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
16 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
17 :
18 : // Do control plane request and return response if any. In case of error it
19 : // returns a bool flag indicating whether it makes sense to retry the request
20 : // and a string with error message.
21 UBC 0 : fn do_control_plane_request(
22 0 : uri: &str,
23 0 : jwt: &str,
24 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
25 0 : let resp = reqwest::blocking::Client::new()
26 0 : .get(uri)
27 0 : .header("Authorization", format!("Bearer {}", jwt))
28 0 : .send()
29 0 : .map_err(|e| {
30 0 : (
31 0 : true,
32 0 : format!("could not perform spec request to control plane: {}", e),
33 0 : )
34 0 : })?;
35 :
36 0 : match resp.status() {
37 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
38 0 : Ok(spec_resp) => Ok(spec_resp),
39 0 : Err(e) => Err((
40 0 : true,
41 0 : format!("could not deserialize control plane response: {}", e),
42 0 : )),
43 : },
44 : StatusCode::SERVICE_UNAVAILABLE => {
45 0 : Err((true, "control plane is temporarily unavailable".to_string()))
46 : }
47 : StatusCode::BAD_GATEWAY => {
48 : // We have a problem with intermittent 502 errors now
49 : // https://github.com/neondatabase/cloud/issues/2353
50 : // It's fine to retry GET request in this case.
51 0 : Err((true, "control plane request failed with 502".to_string()))
52 : }
53 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
54 : // or some internal failure happened. Doesn't make much sense to retry in this case.
55 0 : _ => Err((
56 0 : false,
57 0 : format!(
58 0 : "unexpected control plane response status code: {}",
59 0 : resp.status()
60 0 : ),
61 0 : )),
62 : }
63 0 : }
64 :
65 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
66 : /// env variable is set, it will be used for authorization.
67 0 : pub fn get_spec_from_control_plane(
68 0 : base_uri: &str,
69 0 : compute_id: &str,
70 0 : ) -> Result<Option<ComputeSpec>> {
71 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
72 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
73 0 : Ok(v) => v,
74 0 : Err(_) => "".to_string(),
75 : };
76 0 : let mut attempt = 1;
77 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
78 0 :
79 0 : info!("getting spec from control plane: {}", cp_uri);
80 :
81 : // Do 3 attempts to get spec from the control plane using the following logic:
82 : // - network error -> then retry
83 : // - compute id is unknown or any other error -> bail out
84 : // - no spec for compute yet (Empty state) -> return Ok(None)
85 : // - got spec -> return Ok(Some(spec))
86 0 : while attempt < 4 {
87 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
88 0 : Ok(spec_resp) => match spec_resp.status {
89 0 : ControlPlaneComputeStatus::Empty => Ok(None),
90 : ControlPlaneComputeStatus::Attached => {
91 0 : if let Some(spec) = spec_resp.spec {
92 0 : Ok(Some(spec))
93 : } else {
94 0 : bail!("compute is attached, but spec is empty")
95 : }
96 : }
97 : },
98 0 : Err((retry, msg)) => {
99 0 : if retry {
100 0 : Err(anyhow!(msg))
101 : } else {
102 0 : bail!(msg);
103 : }
104 : }
105 : };
106 :
107 0 : if let Err(e) = &spec {
108 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
109 : } else {
110 0 : return spec;
111 : }
112 :
113 0 : attempt += 1;
114 0 : std::thread::sleep(std::time::Duration::from_millis(100));
115 : }
116 :
117 : // All attempts failed, return error.
118 0 : spec
119 0 : }
120 :
121 : /// Check `pg_hba.conf` and update if needed to allow external connections.
122 CBC 537 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
123 537 : // XXX: consider making it a part of spec.json
124 537 : info!("checking pg_hba.conf");
125 537 : let pghba_path = pgdata_path.join("pg_hba.conf");
126 537 :
127 537 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
128 537 : info!("updated pg_hba.conf to allow external connections");
129 : } else {
130 UBC 0 : info!("pg_hba.conf is up-to-date");
131 : }
132 :
133 CBC 537 : Ok(())
134 537 : }
135 :
136 : /// Create a standby.signal file
137 46 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
138 46 : // XXX: consider making it a part of spec.json
139 46 : info!("adding standby.signal");
140 46 : let signalfile = pgdata_path.join("standby.signal");
141 46 :
142 46 : if !signalfile.exists() {
143 46 : info!("created standby.signal");
144 46 : File::create(signalfile)?;
145 : } else {
146 UBC 0 : info!("reused pre-existing standby.signal");
147 : }
148 CBC 46 : Ok(())
149 46 : }
150 :
151 : /// Compute could be unexpectedly shut down, for example, during the
152 : /// database dropping. This leaves the database in the invalid state,
153 : /// which prevents new db creation with the same name. This function
154 : /// will clean it up before proceeding with catalog updates. All
155 : /// possible future cleanup operations may go here too.
156 221 : #[instrument(skip_all)]
157 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
158 : let existing_dbs = get_existing_dbs(client)?;
159 :
160 : for (_, db) in existing_dbs {
161 : if db.invalid {
162 : // After recent commit in Postgres, interrupted DROP DATABASE
163 : // leaves the database in the invalid state. According to the
164 : // commit message, the only option for user is to drop it again.
165 : // See:
166 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
167 : //
168 : // Postgres Neon extension is done the way, that db is de-registered
169 : // in the control plane metadata only after it is dropped. So there is
170 : // a chance that it still thinks that db should exist. This means
171 : // that it will be re-created by `handle_databases()`. Yet, it's fine
172 : // as user can just repeat drop (in vanilla Postgres they would need
173 : // to do the same, btw).
174 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
175 1 : info!("dropping invalid database {}", db.name);
176 : client.execute(query.as_str(), &[])?;
177 : }
178 : }
179 :
180 : Ok(())
181 : }
182 :
183 : /// Given a cluster spec json and open transaction it handles roles creation,
184 : /// deletion and update.
185 221 : #[instrument(skip_all)]
186 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
187 : let mut xact = client.transaction()?;
188 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
189 :
190 : // Print a list of existing Postgres roles (only in debug mode)
191 221 : if span_enabled!(Level::INFO) {
192 221 : info!("postgres roles:");
193 : for r in &existing_roles {
194 2656 : info!(
195 2656 : " - {}:{}",
196 2656 : r.name,
197 2656 : if r.encrypted_password.is_some() {
198 2656 : "[FILTERED]"
199 2656 : } else {
200 2656 : "(null)"
201 2656 : }
202 2656 : );
203 : }
204 : }
205 :
206 : // Process delta operations first
207 : if let Some(ops) = &spec.delta_operations {
208 UBC 0 : info!("processing role renames");
209 : for op in ops {
210 : match op.action.as_ref() {
211 : "delete_role" => {
212 : // no-op now, roles will be deleted at the end of configuration
213 : }
214 : // Renaming role drops its password, since role name is
215 : // used as a salt there. It is important that this role
216 : // is recorded with a new `name` in the `roles` list.
217 : // Follow up roles update will set the new password.
218 : "rename_role" => {
219 : let new_name = op.new_name.as_ref().unwrap();
220 :
221 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
222 0 : if existing_roles.iter().any(|r| r.name == op.name) {
223 : let query: String = format!(
224 : "ALTER ROLE {} RENAME TO {}",
225 : op.name.pg_quote(),
226 : new_name.pg_quote()
227 : );
228 :
229 0 : warn!("renaming role '{}' to '{}'", op.name, new_name);
230 : xact.execute(query.as_str(), &[])?;
231 : }
232 : }
233 : _ => {}
234 : }
235 : }
236 : }
237 :
238 : // Refresh Postgres roles info to handle possible roles renaming
239 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
240 :
241 CBC 221 : info!("cluster spec roles:");
242 : for role in &spec.cluster.roles {
243 : let name = &role.name;
244 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
245 UBC 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
246 :
247 : enum RoleAction {
248 : None,
249 : Update,
250 : Create,
251 : }
252 : let action = if let Some(r) = pg_role {
253 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
254 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
255 : {
256 : RoleAction::Update
257 : } else if let Some(pg_pwd) = &r.encrypted_password {
258 : // Check whether password changed or not (trim 'md5' prefix first if any)
259 : //
260 : // This is a backward compatibility hack, which comes from the times when we were using
261 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
262 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
263 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
264 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
265 : // to place this compatibility layer.
266 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
267 : stripped
268 : } else {
269 : pg_pwd
270 : };
271 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
272 : RoleAction::Update
273 : } else {
274 : RoleAction::None
275 : }
276 : } else {
277 : RoleAction::None
278 : }
279 : } else {
280 : RoleAction::Create
281 : };
282 :
283 : match action {
284 : RoleAction::None => {}
285 : RoleAction::Update => {
286 : // This can be run on /every/ role! Not just ones created through the console.
287 : // This means that if you add some funny ALTER here that adds a permission,
288 : // this will get run even on user-created roles! This will result in different
289 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
290 : // now only grants LOGIN and changes the password. Please do not allow this branch
291 : // to do anything silly.
292 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
293 : query.push_str(&role.to_pg_options());
294 : xact.execute(query.as_str(), &[])?;
295 : }
296 : RoleAction::Create => {
297 : // This branch only runs when roles are created through the console, so it is
298 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
299 : // from neon_superuser.
300 : let mut query: String = format!(
301 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
302 : name.pg_quote()
303 : );
304 0 : info!("role create query: '{}'", &query);
305 : query.push_str(&role.to_pg_options());
306 : xact.execute(query.as_str(), &[])?;
307 : }
308 : }
309 :
310 0 : if span_enabled!(Level::INFO) {
311 : let pwd = if role.encrypted_password.is_some() {
312 : "[FILTERED]"
313 : } else {
314 : "(null)"
315 : };
316 : let action_str = match action {
317 : RoleAction::None => "",
318 : RoleAction::Create => " -> create",
319 : RoleAction::Update => " -> update",
320 : };
321 0 : info!(" - {}:{}{}", name, pwd, action_str);
322 : }
323 : }
324 :
325 : xact.commit()?;
326 :
327 : Ok(())
328 : }
329 :
330 : /// Reassign all dependent objects and delete requested roles.
331 CBC 221 : #[instrument(skip_all)]
332 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
333 : if let Some(ops) = &spec.delta_operations {
334 : // First, reassign all dependent objects to db owners.
335 UBC 0 : info!("reassigning dependent objects of to-be-deleted roles");
336 :
337 : // Fetch existing roles. We could've exported and used `existing_roles` from
338 : // `handle_roles()`, but we only make this list there before creating new roles.
339 : // Which is probably fine as we never create to-be-deleted roles, but that'd
340 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
341 : // buffers already, so this shouldn't be a big deal.
342 : let mut xact = client.transaction()?;
343 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
344 : xact.commit()?;
345 :
346 : for op in ops {
347 : // Check that role is still present in Postgres, as this could be a
348 : // restart with the same spec after role deletion.
349 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
350 : reassign_owned_objects(spec, connstr, &op.name)?;
351 : }
352 : }
353 :
354 : // Second, proceed with role deletions.
355 0 : info!("processing role deletions");
356 : let mut xact = client.transaction()?;
357 : for op in ops {
358 : // We do not check either role exists or not,
359 : // Postgres will take care of it for us
360 : if op.action == "delete_role" {
361 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
362 :
363 0 : warn!("deleting role '{}'", &op.name);
364 : xact.execute(query.as_str(), &[])?;
365 : }
366 : }
367 : xact.commit()?;
368 : }
369 :
370 : Ok(())
371 : }
372 :
373 0 : fn reassign_owned_objects_in_one_db(
374 0 : conf: Config,
375 0 : role_name: &PgIdent,
376 0 : db_owner: &PgIdent,
377 0 : ) -> Result<()> {
378 0 : let mut client = conf.connect(NoTls)?;
379 :
380 : // This will reassign all dependent objects to the db owner
381 0 : let reassign_query = format!(
382 0 : "REASSIGN OWNED BY {} TO {}",
383 0 : role_name.pg_quote(),
384 0 : db_owner.pg_quote()
385 0 : );
386 0 : info!(
387 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
388 0 : role_name,
389 0 : conf.get_dbname().unwrap_or(""),
390 0 : db_owner
391 0 : );
392 0 : client.simple_query(&reassign_query)?;
393 :
394 : // This now will only drop privileges of the role
395 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
396 0 : client.simple_query(&drop_query)?;
397 0 : Ok(())
398 0 : }
399 :
400 : // Reassign all owned objects in all databases to the owner of the database.
401 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
402 0 : for db in &spec.cluster.databases {
403 0 : if db.owner != *role_name {
404 0 : let mut conf = Config::from_str(connstr)?;
405 0 : conf.dbname(&db.name);
406 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
407 0 : }
408 : }
409 :
410 : // Also handle case when there are no databases in the spec.
411 : // In this case we need to reassign objects in the default database.
412 0 : let conf = Config::from_str(connstr)?;
413 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
414 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
415 :
416 0 : Ok(())
417 0 : }
418 :
419 : /// It follows mostly the same logic as `handle_roles()` excepting that we
420 : /// does not use an explicit transactions block, since major database operations
421 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
422 : /// atomicity should be enough here due to the order of operations and various checks,
423 : /// which together provide us idempotency.
424 CBC 221 : #[instrument(skip_all)]
425 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
426 : let existing_dbs = get_existing_dbs(client)?;
427 :
428 : // Print a list of existing Postgres databases (only in debug mode)
429 221 : if span_enabled!(Level::INFO) {
430 221 : info!("postgres databases:");
431 : for (dbname, db) in &existing_dbs {
432 663 : info!(" {}:{}", dbname, db.owner);
433 : }
434 : }
435 :
436 : // Process delta operations first
437 : if let Some(ops) = &spec.delta_operations {
438 UBC 0 : info!("processing delta operations on databases");
439 : for op in ops {
440 : match op.action.as_ref() {
441 : // We do not check either DB exists or not,
442 : // Postgres will take care of it for us
443 : "delete_db" => {
444 : // In Postgres we can't drop a database if it is a template.
445 : // So we need to unset the template flag first, but it could
446 : // be a retry, so we could've already dropped the database.
447 : // Check that database exists first to make it idempotent.
448 : let unset_template_query: String = format!(
449 : "
450 : DO $$
451 : BEGIN
452 : IF EXISTS(
453 : SELECT 1
454 : FROM pg_catalog.pg_database
455 : WHERE datname = {}
456 : )
457 : THEN
458 : ALTER DATABASE {} is_template false;
459 : END IF;
460 : END
461 : $$;",
462 : escape_literal(&op.name),
463 : &op.name.pg_quote()
464 : );
465 : // Use FORCE to drop database even if there are active connections.
466 : // We run this from `cloud_admin`, so it should have enough privileges.
467 : // NB: there could be other db states, which prevent us from dropping
468 : // the database. For example, if db is used by any active subscription
469 : // or replication slot.
470 : // TODO: deal with it once we allow logical replication. Proper fix should
471 : // involve returning an error code to the control plane, so it could
472 : // figure out that this is a non-retryable error, return it to the user
473 : // and fail operation permanently.
474 : let drop_db_query: String = format!(
475 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
476 : &op.name.pg_quote()
477 : );
478 :
479 0 : warn!("deleting database '{}'", &op.name);
480 : client.execute(unset_template_query.as_str(), &[])?;
481 : client.execute(drop_db_query.as_str(), &[])?;
482 : }
483 : "rename_db" => {
484 : let new_name = op.new_name.as_ref().unwrap();
485 :
486 : if existing_dbs.get(&op.name).is_some() {
487 : let query: String = format!(
488 : "ALTER DATABASE {} RENAME TO {}",
489 : op.name.pg_quote(),
490 : new_name.pg_quote()
491 : );
492 :
493 0 : warn!("renaming database '{}' to '{}'", op.name, new_name);
494 : client.execute(query.as_str(), &[])?;
495 : }
496 : }
497 : _ => {}
498 : }
499 : }
500 : }
501 :
502 : // Refresh Postgres databases info to handle possible renames
503 : let existing_dbs = get_existing_dbs(client)?;
504 :
505 CBC 221 : info!("cluster spec databases:");
506 : for db in &spec.cluster.databases {
507 : let name = &db.name;
508 : let pg_db = existing_dbs.get(name);
509 :
510 : enum DatabaseAction {
511 : None,
512 : Update,
513 : Create,
514 : }
515 : let action = if let Some(r) = pg_db {
516 : // XXX: db owner name is returned as quoted string from Postgres,
517 : // when quoting is needed.
518 : let new_owner = if r.owner.starts_with('"') {
519 : db.owner.pg_quote()
520 : } else {
521 : db.owner.clone()
522 : };
523 :
524 : if new_owner != r.owner {
525 : // Update the owner
526 : DatabaseAction::Update
527 : } else {
528 : DatabaseAction::None
529 : }
530 : } else {
531 : DatabaseAction::Create
532 : };
533 :
534 : match action {
535 : DatabaseAction::None => {}
536 : DatabaseAction::Update => {
537 : let query: String = format!(
538 : "ALTER DATABASE {} OWNER TO {}",
539 : name.pg_quote(),
540 : db.owner.pg_quote()
541 : );
542 : let _guard = info_span!("executing", query).entered();
543 : client.execute(query.as_str(), &[])?;
544 : }
545 : DatabaseAction::Create => {
546 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
547 : query.push_str(&db.to_pg_options());
548 : let _guard = info_span!("executing", query).entered();
549 : client.execute(query.as_str(), &[])?;
550 : let grant_query: String = format!(
551 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
552 : name.pg_quote()
553 : );
554 : client.execute(grant_query.as_str(), &[])?;
555 : }
556 : };
557 :
558 UBC 0 : if span_enabled!(Level::INFO) {
559 : let action_str = match action {
560 : DatabaseAction::None => "",
561 : DatabaseAction::Create => " -> create",
562 : DatabaseAction::Update => " -> update",
563 : };
564 0 : info!(" - {}:{}{}", db.name, db.owner, action_str);
565 : }
566 : }
567 :
568 : Ok(())
569 : }
570 :
571 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
572 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
573 CBC 221 : #[instrument(skip_all)]
574 : pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
575 221 : info!("modifying database permissions");
576 : let existing_dbs = get_existing_dbs(client)?;
577 :
578 : // Do some per-database access adjustments. We'd better do this at db creation time,
579 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
580 : // atomically.
581 : for db in &spec.cluster.databases {
582 : match existing_dbs.get(&db.name) {
583 : Some(pg_db) => {
584 : if pg_db.restrict_conn || pg_db.invalid {
585 UBC 0 : info!(
586 0 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
587 0 : db.name, pg_db.invalid, pg_db.restrict_conn
588 0 : );
589 : continue;
590 : }
591 : }
592 : None => {
593 : bail!(
594 : "database {} doesn't exist in Postgres after handle_databases()",
595 : db.name
596 : );
597 : }
598 : }
599 :
600 : let mut conf = Config::from_str(connstr)?;
601 : conf.dbname(&db.name);
602 :
603 : let mut db_client = conf.connect(NoTls)?;
604 :
605 : // This will only change ownership on the schema itself, not the objects
606 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
607 : // and database owner cannot do anything with it. SQL procedure ensures
608 : // that it won't error out if schema `public` doesn't exist.
609 : let alter_query = format!(
610 : "DO $$\n\
611 : DECLARE\n\
612 : schema_owner TEXT;\n\
613 : BEGIN\n\
614 : IF EXISTS(\n\
615 : SELECT nspname\n\
616 : FROM pg_catalog.pg_namespace\n\
617 : WHERE nspname = 'public'\n\
618 : )\n\
619 : THEN\n\
620 : SELECT nspowner::regrole::text\n\
621 : FROM pg_catalog.pg_namespace\n\
622 : WHERE nspname = 'public'\n\
623 : INTO schema_owner;\n\
624 : \n\
625 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
626 : THEN\n\
627 : ALTER SCHEMA public OWNER TO {};\n\
628 : END IF;\n\
629 : END IF;\n\
630 : END\n\
631 : $$;",
632 : db.owner.pg_quote()
633 : );
634 : db_client.simple_query(&alter_query)?;
635 :
636 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
637 : // This is needed because since postgres 15 this privilege is removed by default.
638 : // TODO: web_access isn't created for almost 1 year. It could be that we have
639 : // active users of 1 year old projects, but hopefully not, so check it and
640 : // remove this code if possible. The worst thing that could happen is that
641 : // user won't be able to use public schema in NEW databases created in the
642 : // very OLD project.
643 : let grant_query = "DO $$\n\
644 : BEGIN\n\
645 : IF EXISTS(\n\
646 : SELECT nspname\n\
647 : FROM pg_catalog.pg_namespace\n\
648 : WHERE nspname = 'public'\n\
649 : ) AND\n\
650 : current_setting('server_version_num')::int/10000 >= 15\n\
651 : THEN\n\
652 : IF EXISTS(\n\
653 : SELECT rolname\n\
654 : FROM pg_catalog.pg_roles\n\
655 : WHERE rolname = 'web_access'\n\
656 : )\n\
657 : THEN\n\
658 : GRANT CREATE ON SCHEMA public TO web_access;\n\
659 : END IF;\n\
660 : END IF;\n\
661 : END\n\
662 : $$;"
663 : .to_string();
664 :
665 0 : info!("grant query for db {} : {}", &db.name, &grant_query);
666 : db_client.simple_query(&grant_query)?;
667 : }
668 :
669 : Ok(())
670 : }
671 :
672 : /// Create required system extensions
673 CBC 221 : #[instrument(skip_all)]
674 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
675 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
676 : if libs.contains("pg_stat_statements") {
677 : // Create extension only if this compute really needs it
678 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
679 UBC 0 : info!("creating system extensions with query: {}", query);
680 : client.simple_query(query)?;
681 : }
682 : }
683 :
684 : Ok(())
685 : }
686 :
687 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
688 CBC 221 : #[instrument(skip_all)]
689 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
690 221 : info!("handle extension neon");
691 :
692 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
693 : client.simple_query(query)?;
694 :
695 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
696 221 : info!("create neon extension with query: {}", query);
697 : client.simple_query(query)?;
698 :
699 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
700 : client.simple_query(query)?;
701 :
702 : query = "ALTER EXTENSION neon SET SCHEMA neon";
703 221 : info!("alter neon extension schema with query: {}", query);
704 : client.simple_query(query)?;
705 :
706 : // this will be a no-op if extension is already up to date,
707 : // which may happen in two cases:
708 : // - extension was just installed
709 : // - extension was already installed and is up to date
710 : let query = "ALTER EXTENSION neon UPDATE";
711 221 : info!("update neon extension schema with query: {}", query);
712 : client.simple_query(query)?;
713 :
714 : Ok(())
715 : }
|