Line data Source code
1 : use std::fs::File;
2 : use std::path::Path;
3 : use std::str::FromStr;
4 :
5 : use anyhow::{anyhow, bail, Result};
6 : use postgres::config::Config;
7 : use postgres::{Client, NoTls};
8 : use reqwest::StatusCode;
9 : use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
10 :
11 : use crate::config;
12 : use crate::logger::inlinify;
13 : use crate::params::PG_HBA_ALL_MD5;
14 : use crate::pg_helpers::*;
15 :
16 : use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
17 : use compute_api::spec::{ComputeSpec, PgIdent, Role};
18 :
19 : // Do control plane request and return response if any. In case of error it
20 : // returns a bool flag indicating whether it makes sense to retry the request
21 : // and a string with error message.
22 0 : fn do_control_plane_request(
23 0 : uri: &str,
24 0 : jwt: &str,
25 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String)> {
26 0 : let resp = reqwest::blocking::Client::new()
27 0 : .get(uri)
28 0 : .header("Authorization", format!("Bearer {}", jwt))
29 0 : .send()
30 0 : .map_err(|e| {
31 0 : (
32 0 : true,
33 0 : format!("could not perform spec request to control plane: {}", e),
34 0 : )
35 0 : })?;
36 :
37 0 : match resp.status() {
38 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
39 0 : Ok(spec_resp) => Ok(spec_resp),
40 0 : Err(e) => Err((
41 0 : true,
42 0 : format!("could not deserialize control plane response: {}", e),
43 0 : )),
44 : },
45 : StatusCode::SERVICE_UNAVAILABLE => {
46 0 : Err((true, "control plane is temporarily unavailable".to_string()))
47 : }
48 : StatusCode::BAD_GATEWAY => {
49 : // We have a problem with intermittent 502 errors now
50 : // https://github.com/neondatabase/cloud/issues/2353
51 : // It's fine to retry GET request in this case.
52 0 : Err((true, "control plane request failed with 502".to_string()))
53 : }
54 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
55 : // or some internal failure happened. Doesn't make much sense to retry in this case.
56 0 : _ => Err((
57 0 : false,
58 0 : format!(
59 0 : "unexpected control plane response status code: {}",
60 0 : resp.status()
61 0 : ),
62 0 : )),
63 : }
64 0 : }
65 :
66 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
67 : /// env variable is set, it will be used for authorization.
68 0 : pub fn get_spec_from_control_plane(
69 0 : base_uri: &str,
70 0 : compute_id: &str,
71 0 : ) -> Result<Option<ComputeSpec>> {
72 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
73 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
74 0 : Ok(v) => v,
75 0 : Err(_) => "".to_string(),
76 : };
77 0 : let mut attempt = 1;
78 0 : let mut spec: Result<Option<ComputeSpec>> = Ok(None);
79 0 :
80 0 : info!("getting spec from control plane: {}", cp_uri);
81 :
82 : // Do 3 attempts to get spec from the control plane using the following logic:
83 : // - network error -> then retry
84 : // - compute id is unknown or any other error -> bail out
85 : // - no spec for compute yet (Empty state) -> return Ok(None)
86 : // - got spec -> return Ok(Some(spec))
87 0 : while attempt < 4 {
88 0 : spec = match do_control_plane_request(&cp_uri, &jwt) {
89 0 : Ok(spec_resp) => match spec_resp.status {
90 0 : ControlPlaneComputeStatus::Empty => Ok(None),
91 : ControlPlaneComputeStatus::Attached => {
92 0 : if let Some(spec) = spec_resp.spec {
93 0 : Ok(Some(spec))
94 : } else {
95 0 : bail!("compute is attached, but spec is empty")
96 : }
97 : }
98 : },
99 0 : Err((retry, msg)) => {
100 0 : if retry {
101 0 : Err(anyhow!(msg))
102 : } else {
103 0 : bail!(msg);
104 : }
105 : }
106 : };
107 :
108 0 : if let Err(e) = &spec {
109 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
110 : } else {
111 0 : return spec;
112 : }
113 :
114 0 : attempt += 1;
115 0 : std::thread::sleep(std::time::Duration::from_millis(100));
116 : }
117 :
118 : // All attempts failed, return error.
119 0 : spec
120 0 : }
121 :
122 : /// Check `pg_hba.conf` and update if needed to allow external connections.
123 575 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
124 575 : // XXX: consider making it a part of spec.json
125 575 : info!("checking pg_hba.conf");
126 575 : let pghba_path = pgdata_path.join("pg_hba.conf");
127 575 :
128 575 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
129 575 : info!("updated pg_hba.conf to allow external connections");
130 : } else {
131 0 : info!("pg_hba.conf is up-to-date");
132 : }
133 :
134 575 : Ok(())
135 575 : }
136 :
137 : /// Create a standby.signal file
138 49 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
139 49 : // XXX: consider making it a part of spec.json
140 49 : info!("adding standby.signal");
141 49 : let signalfile = pgdata_path.join("standby.signal");
142 49 :
143 49 : if !signalfile.exists() {
144 49 : info!("created standby.signal");
145 49 : File::create(signalfile)?;
146 : } else {
147 0 : info!("reused pre-existing standby.signal");
148 : }
149 49 : Ok(())
150 49 : }
151 :
152 : /// Compute could be unexpectedly shut down, for example, during the
153 : /// database dropping. This leaves the database in the invalid state,
154 : /// which prevents new db creation with the same name. This function
155 : /// will clean it up before proceeding with catalog updates. All
156 : /// possible future cleanup operations may go here too.
157 229 : #[instrument(skip_all)]
158 : pub fn cleanup_instance(client: &mut Client) -> Result<()> {
159 : let existing_dbs = get_existing_dbs(client)?;
160 :
161 : for (_, db) in existing_dbs {
162 : if db.invalid {
163 : // After recent commit in Postgres, interrupted DROP DATABASE
164 : // leaves the database in the invalid state. According to the
165 : // commit message, the only option for user is to drop it again.
166 : // See:
167 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
168 : //
169 : // Postgres Neon extension is done the way, that db is de-registered
170 : // in the control plane metadata only after it is dropped. So there is
171 : // a chance that it still thinks that db should exist. This means
172 : // that it will be re-created by `handle_databases()`. Yet, it's fine
173 : // as user can just repeat drop (in vanilla Postgres they would need
174 : // to do the same, btw).
175 : let query = format!("DROP DATABASE IF EXISTS {}", db.name.pg_quote());
176 1 : info!("dropping invalid database {}", db.name);
177 : client.execute(query.as_str(), &[])?;
178 : }
179 : }
180 :
181 : Ok(())
182 : }
183 :
184 : /// Given a cluster spec json and open transaction it handles roles creation,
185 : /// deletion and update.
186 229 : #[instrument(skip_all)]
187 : pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
188 : let mut xact = client.transaction()?;
189 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
190 :
191 : // Print a list of existing Postgres roles (only in debug mode)
192 229 : if span_enabled!(Level::INFO) {
193 : let mut vec = Vec::new();
194 : for r in &existing_roles {
195 : vec.push(format!(
196 : "{}:{}",
197 : r.name,
198 : if r.encrypted_password.is_some() {
199 : "[FILTERED]"
200 : } else {
201 : "(null)"
202 : }
203 : ));
204 : }
205 :
206 229 : info!("postgres roles (total {}): {:?}", vec.len(), vec);
207 : }
208 :
209 : // Process delta operations first
210 : if let Some(ops) = &spec.delta_operations {
211 0 : info!("processing role renames");
212 : for op in ops {
213 : match op.action.as_ref() {
214 : "delete_role" => {
215 : // no-op now, roles will be deleted at the end of configuration
216 : }
217 : // Renaming role drops its password, since role name is
218 : // used as a salt there. It is important that this role
219 : // is recorded with a new `name` in the `roles` list.
220 : // Follow up roles update will set the new password.
221 : "rename_role" => {
222 : let new_name = op.new_name.as_ref().unwrap();
223 :
224 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
225 0 : if existing_roles.iter().any(|r| r.name == op.name) {
226 : let query: String = format!(
227 : "ALTER ROLE {} RENAME TO {}",
228 : op.name.pg_quote(),
229 : new_name.pg_quote()
230 : );
231 :
232 0 : warn!("renaming role '{}' to '{}'", op.name, new_name);
233 : xact.execute(query.as_str(), &[])?;
234 : }
235 : }
236 : _ => {}
237 : }
238 : }
239 : }
240 :
241 : // Refresh Postgres roles info to handle possible roles renaming
242 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
243 :
244 229 : info!(
245 229 : "handling cluster spec roles (total {})",
246 229 : spec.cluster.roles.len()
247 229 : );
248 : for role in &spec.cluster.roles {
249 : let name = &role.name;
250 : // XXX: with a limited number of roles it is fine, but consider making it a HashMap
251 0 : let pg_role = existing_roles.iter().find(|r| r.name == *name);
252 :
253 : enum RoleAction {
254 : None,
255 : Update,
256 : Create,
257 : }
258 : let action = if let Some(r) = pg_role {
259 : if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
260 : || (r.encrypted_password.is_some() && role.encrypted_password.is_none())
261 : {
262 : RoleAction::Update
263 : } else if let Some(pg_pwd) = &r.encrypted_password {
264 : // Check whether password changed or not (trim 'md5' prefix first if any)
265 : //
266 : // This is a backward compatibility hack, which comes from the times when we were using
267 : // md5 for everyone and hashes were stored in the console db without md5 prefix. So when
268 : // role comes from the control-plane (json spec) `Role.encrypted_password` doesn't have md5 prefix,
269 : // but when role comes from Postgres (`get_existing_roles` / `existing_roles`) it has this prefix.
270 : // Here is the only place so far where we compare hashes, so it seems to be the best candidate
271 : // to place this compatibility layer.
272 : let pg_pwd = if let Some(stripped) = pg_pwd.strip_prefix("md5") {
273 : stripped
274 : } else {
275 : pg_pwd
276 : };
277 : if pg_pwd != *role.encrypted_password.as_ref().unwrap() {
278 : RoleAction::Update
279 : } else {
280 : RoleAction::None
281 : }
282 : } else {
283 : RoleAction::None
284 : }
285 : } else {
286 : RoleAction::Create
287 : };
288 :
289 : match action {
290 : RoleAction::None => {}
291 : RoleAction::Update => {
292 : // This can be run on /every/ role! Not just ones created through the console.
293 : // This means that if you add some funny ALTER here that adds a permission,
294 : // this will get run even on user-created roles! This will result in different
295 : // behavior before and after a spec gets reapplied. The below ALTER as it stands
296 : // now only grants LOGIN and changes the password. Please do not allow this branch
297 : // to do anything silly.
298 : let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
299 : query.push_str(&role.to_pg_options());
300 : xact.execute(query.as_str(), &[])?;
301 : }
302 : RoleAction::Create => {
303 : // This branch only runs when roles are created through the console, so it is
304 : // safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
305 : // from neon_superuser.
306 : let mut query: String = format!(
307 : "CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
308 : name.pg_quote()
309 : );
310 0 : info!("running role create query: '{}'", &query);
311 : query.push_str(&role.to_pg_options());
312 : xact.execute(query.as_str(), &[])?;
313 : }
314 : }
315 :
316 0 : if span_enabled!(Level::INFO) {
317 : let pwd = if role.encrypted_password.is_some() {
318 : "[FILTERED]"
319 : } else {
320 : "(null)"
321 : };
322 : let action_str = match action {
323 : RoleAction::None => "",
324 : RoleAction::Create => " -> create",
325 : RoleAction::Update => " -> update",
326 : };
327 0 : info!(" - {}:{}{}", name, pwd, action_str);
328 : }
329 : }
330 :
331 : xact.commit()?;
332 :
333 : Ok(())
334 : }
335 :
336 : /// Reassign all dependent objects and delete requested roles.
337 229 : #[instrument(skip_all)]
338 : pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
339 : if let Some(ops) = &spec.delta_operations {
340 : // First, reassign all dependent objects to db owners.
341 0 : info!("reassigning dependent objects of to-be-deleted roles");
342 :
343 : // Fetch existing roles. We could've exported and used `existing_roles` from
344 : // `handle_roles()`, but we only make this list there before creating new roles.
345 : // Which is probably fine as we never create to-be-deleted roles, but that'd
346 : // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
347 : // buffers already, so this shouldn't be a big deal.
348 : let mut xact = client.transaction()?;
349 : let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
350 : xact.commit()?;
351 :
352 : for op in ops {
353 : // Check that role is still present in Postgres, as this could be a
354 : // restart with the same spec after role deletion.
355 0 : if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
356 : reassign_owned_objects(spec, connstr, &op.name)?;
357 : }
358 : }
359 :
360 : // Second, proceed with role deletions.
361 0 : info!("processing role deletions");
362 : let mut xact = client.transaction()?;
363 : for op in ops {
364 : // We do not check either role exists or not,
365 : // Postgres will take care of it for us
366 : if op.action == "delete_role" {
367 : let query: String = format!("DROP ROLE IF EXISTS {}", &op.name.pg_quote());
368 :
369 0 : warn!("deleting role '{}'", &op.name);
370 : xact.execute(query.as_str(), &[])?;
371 : }
372 : }
373 : xact.commit()?;
374 : }
375 :
376 : Ok(())
377 : }
378 :
379 0 : fn reassign_owned_objects_in_one_db(
380 0 : conf: Config,
381 0 : role_name: &PgIdent,
382 0 : db_owner: &PgIdent,
383 0 : ) -> Result<()> {
384 0 : let mut client = conf.connect(NoTls)?;
385 :
386 : // This will reassign all dependent objects to the db owner
387 0 : let reassign_query = format!(
388 0 : "REASSIGN OWNED BY {} TO {}",
389 0 : role_name.pg_quote(),
390 0 : db_owner.pg_quote()
391 0 : );
392 0 : info!(
393 0 : "reassigning objects owned by '{}' in db '{}' to '{}'",
394 0 : role_name,
395 0 : conf.get_dbname().unwrap_or(""),
396 0 : db_owner
397 0 : );
398 0 : client.simple_query(&reassign_query)?;
399 :
400 : // This now will only drop privileges of the role
401 0 : let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
402 0 : client.simple_query(&drop_query)?;
403 0 : Ok(())
404 0 : }
405 :
406 : // Reassign all owned objects in all databases to the owner of the database.
407 0 : fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
408 0 : for db in &spec.cluster.databases {
409 0 : if db.owner != *role_name {
410 0 : let mut conf = Config::from_str(connstr)?;
411 0 : conf.dbname(&db.name);
412 0 : reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
413 0 : }
414 : }
415 :
416 : // Also handle case when there are no databases in the spec.
417 : // In this case we need to reassign objects in the default database.
418 0 : let conf = Config::from_str(connstr)?;
419 0 : let db_owner = PgIdent::from_str("cloud_admin")?;
420 0 : reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
421 :
422 0 : Ok(())
423 0 : }
424 :
425 : /// It follows mostly the same logic as `handle_roles()` excepting that we
426 : /// does not use an explicit transactions block, since major database operations
427 : /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
428 : /// atomicity should be enough here due to the order of operations and various checks,
429 : /// which together provide us idempotency.
430 229 : #[instrument(skip_all)]
431 : pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
432 : let existing_dbs = get_existing_dbs(client)?;
433 :
434 : // Print a list of existing Postgres databases (only in debug mode)
435 229 : if span_enabled!(Level::INFO) {
436 : let mut vec = Vec::new();
437 : for (dbname, db) in &existing_dbs {
438 : vec.push(format!("{}:{}", dbname, db.owner));
439 : }
440 229 : info!("postgres databases (total {}): {:?}", vec.len(), vec);
441 : }
442 :
443 : // Process delta operations first
444 : if let Some(ops) = &spec.delta_operations {
445 0 : info!("processing delta operations on databases");
446 : for op in ops {
447 : match op.action.as_ref() {
448 : // We do not check either DB exists or not,
449 : // Postgres will take care of it for us
450 : "delete_db" => {
451 : // In Postgres we can't drop a database if it is a template.
452 : // So we need to unset the template flag first, but it could
453 : // be a retry, so we could've already dropped the database.
454 : // Check that database exists first to make it idempotent.
455 : let unset_template_query: String = format!(
456 : "
457 : DO $$
458 : BEGIN
459 : IF EXISTS(
460 : SELECT 1
461 : FROM pg_catalog.pg_database
462 : WHERE datname = {}
463 : )
464 : THEN
465 : ALTER DATABASE {} is_template false;
466 : END IF;
467 : END
468 : $$;",
469 : escape_literal(&op.name),
470 : &op.name.pg_quote()
471 : );
472 : // Use FORCE to drop database even if there are active connections.
473 : // We run this from `cloud_admin`, so it should have enough privileges.
474 : // NB: there could be other db states, which prevent us from dropping
475 : // the database. For example, if db is used by any active subscription
476 : // or replication slot.
477 : // TODO: deal with it once we allow logical replication. Proper fix should
478 : // involve returning an error code to the control plane, so it could
479 : // figure out that this is a non-retryable error, return it to the user
480 : // and fail operation permanently.
481 : let drop_db_query: String = format!(
482 : "DROP DATABASE IF EXISTS {} WITH (FORCE)",
483 : &op.name.pg_quote()
484 : );
485 :
486 0 : warn!("deleting database '{}'", &op.name);
487 : client.execute(unset_template_query.as_str(), &[])?;
488 : client.execute(drop_db_query.as_str(), &[])?;
489 : }
490 : "rename_db" => {
491 : let new_name = op.new_name.as_ref().unwrap();
492 :
493 : if existing_dbs.get(&op.name).is_some() {
494 : let query: String = format!(
495 : "ALTER DATABASE {} RENAME TO {}",
496 : op.name.pg_quote(),
497 : new_name.pg_quote()
498 : );
499 :
500 0 : warn!("renaming database '{}' to '{}'", op.name, new_name);
501 : client.execute(query.as_str(), &[])?;
502 : }
503 : }
504 : _ => {}
505 : }
506 : }
507 : }
508 :
509 : // Refresh Postgres databases info to handle possible renames
510 : let existing_dbs = get_existing_dbs(client)?;
511 :
512 229 : info!(
513 229 : "handling cluster spec databases (total {})",
514 229 : spec.cluster.databases.len()
515 229 : );
516 : for db in &spec.cluster.databases {
517 : let name = &db.name;
518 : let pg_db = existing_dbs.get(name);
519 :
520 : enum DatabaseAction {
521 : None,
522 : Update,
523 : Create,
524 : }
525 : let action = if let Some(r) = pg_db {
526 : // XXX: db owner name is returned as quoted string from Postgres,
527 : // when quoting is needed.
528 : let new_owner = if r.owner.starts_with('"') {
529 : db.owner.pg_quote()
530 : } else {
531 : db.owner.clone()
532 : };
533 :
534 : if new_owner != r.owner {
535 : // Update the owner
536 : DatabaseAction::Update
537 : } else {
538 : DatabaseAction::None
539 : }
540 : } else {
541 : DatabaseAction::Create
542 : };
543 :
544 : match action {
545 : DatabaseAction::None => {}
546 : DatabaseAction::Update => {
547 : let query: String = format!(
548 : "ALTER DATABASE {} OWNER TO {}",
549 : name.pg_quote(),
550 : db.owner.pg_quote()
551 : );
552 : let _guard = info_span!("executing", query).entered();
553 : client.execute(query.as_str(), &[])?;
554 : }
555 : DatabaseAction::Create => {
556 : let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
557 : query.push_str(&db.to_pg_options());
558 : let _guard = info_span!("executing", query).entered();
559 : client.execute(query.as_str(), &[])?;
560 : let grant_query: String = format!(
561 : "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
562 : name.pg_quote()
563 : );
564 : client.execute(grant_query.as_str(), &[])?;
565 : }
566 : };
567 :
568 0 : if span_enabled!(Level::INFO) {
569 : let action_str = match action {
570 : DatabaseAction::None => "",
571 : DatabaseAction::Create => " -> create",
572 : DatabaseAction::Update => " -> update",
573 : };
574 0 : info!(" - {}:{}{}", db.name, db.owner, action_str);
575 : }
576 : }
577 :
578 : Ok(())
579 : }
580 :
581 : /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
582 : /// to allow users creating trusted extensions and re-creating `public` schema, for example.
583 229 : #[instrument(skip_all)]
584 : pub fn handle_grants(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
585 229 : info!("modifying database permissions");
586 : let existing_dbs = get_existing_dbs(client)?;
587 :
588 : // Do some per-database access adjustments. We'd better do this at db creation time,
589 : // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
590 : // atomically.
591 : for db in &spec.cluster.databases {
592 : match existing_dbs.get(&db.name) {
593 : Some(pg_db) => {
594 : if pg_db.restrict_conn || pg_db.invalid {
595 0 : info!(
596 0 : "skipping grants for db {} (invalid: {}, connections not allowed: {})",
597 0 : db.name, pg_db.invalid, pg_db.restrict_conn
598 0 : );
599 : continue;
600 : }
601 : }
602 : None => {
603 : bail!(
604 : "database {} doesn't exist in Postgres after handle_databases()",
605 : db.name
606 : );
607 : }
608 : }
609 :
610 : let mut conf = Config::from_str(connstr)?;
611 : conf.dbname(&db.name);
612 :
613 : let mut db_client = conf.connect(NoTls)?;
614 :
615 : // This will only change ownership on the schema itself, not the objects
616 : // inside it. Without it owner of the `public` schema will be `cloud_admin`
617 : // and database owner cannot do anything with it. SQL procedure ensures
618 : // that it won't error out if schema `public` doesn't exist.
619 : let alter_query = format!(
620 : "DO $$\n\
621 : DECLARE\n\
622 : schema_owner TEXT;\n\
623 : BEGIN\n\
624 : IF EXISTS(\n\
625 : SELECT nspname\n\
626 : FROM pg_catalog.pg_namespace\n\
627 : WHERE nspname = 'public'\n\
628 : )\n\
629 : THEN\n\
630 : SELECT nspowner::regrole::text\n\
631 : FROM pg_catalog.pg_namespace\n\
632 : WHERE nspname = 'public'\n\
633 : INTO schema_owner;\n\
634 : \n\
635 : IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
636 : THEN\n\
637 : ALTER SCHEMA public OWNER TO {};\n\
638 : END IF;\n\
639 : END IF;\n\
640 : END\n\
641 : $$;",
642 : db.owner.pg_quote()
643 : );
644 : db_client.simple_query(&alter_query)?;
645 :
646 : // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
647 : // This is needed because since postgres 15 this privilege is removed by default.
648 : // TODO: web_access isn't created for almost 1 year. It could be that we have
649 : // active users of 1 year old projects, but hopefully not, so check it and
650 : // remove this code if possible. The worst thing that could happen is that
651 : // user won't be able to use public schema in NEW databases created in the
652 : // very OLD project.
653 : let grant_query = "DO $$\n\
654 : BEGIN\n\
655 : IF EXISTS(\n\
656 : SELECT nspname\n\
657 : FROM pg_catalog.pg_namespace\n\
658 : WHERE nspname = 'public'\n\
659 : ) AND\n\
660 : current_setting('server_version_num')::int/10000 >= 15\n\
661 : THEN\n\
662 : IF EXISTS(\n\
663 : SELECT rolname\n\
664 : FROM pg_catalog.pg_roles\n\
665 : WHERE rolname = 'web_access'\n\
666 : )\n\
667 : THEN\n\
668 : GRANT CREATE ON SCHEMA public TO web_access;\n\
669 : END IF;\n\
670 : END IF;\n\
671 : END\n\
672 : $$;"
673 : .to_string();
674 :
675 0 : info!(
676 0 : "grant query for db {} : {}",
677 0 : &db.name,
678 0 : inlinify(&grant_query)
679 0 : );
680 : db_client.simple_query(&grant_query)?;
681 : }
682 :
683 : Ok(())
684 : }
685 :
686 : /// Create required system extensions
687 229 : #[instrument(skip_all)]
688 : pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
689 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
690 : if libs.contains("pg_stat_statements") {
691 : // Create extension only if this compute really needs it
692 : let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
693 0 : info!("creating system extensions with query: {}", query);
694 : client.simple_query(query)?;
695 : }
696 : }
697 :
698 : Ok(())
699 : }
700 :
701 : /// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
702 229 : #[instrument(skip_all)]
703 : pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
704 229 : info!("handle extension neon");
705 :
706 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
707 : client.simple_query(query)?;
708 :
709 : query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
710 229 : info!("create neon extension with query: {}", query);
711 : client.simple_query(query)?;
712 :
713 : query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
714 : client.simple_query(query)?;
715 :
716 : query = "ALTER EXTENSION neon SET SCHEMA neon";
717 229 : info!("alter neon extension schema with query: {}", query);
718 : client.simple_query(query)?;
719 :
720 : // this will be a no-op if extension is already up to date,
721 : // which may happen in two cases:
722 : // - extension was just installed
723 : // - extension was already installed and is up to date
724 : let query = "ALTER EXTENSION neon UPDATE";
725 229 : info!("update neon extension schema with query: {}", query);
726 : client.simple_query(query)?;
727 :
728 : Ok(())
729 : }
730 :
731 4 : #[instrument(skip_all)]
732 : pub fn handle_migrations(client: &mut Client) -> Result<()> {
733 4 : info!("handle migrations");
734 :
735 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
736 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
737 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
738 :
739 : let migrations = [
740 : "ALTER ROLE neon_superuser BYPASSRLS",
741 : r#"
742 : DO $$
743 : DECLARE
744 : role_name text;
745 : BEGIN
746 : FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
747 : LOOP
748 : RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
749 : EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
750 : END LOOP;
751 :
752 : FOR role_name IN SELECT rolname FROM pg_roles
753 : WHERE
754 : NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
755 : LOOP
756 : RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
757 : EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
758 : END LOOP;
759 : END $$;
760 : "#,
761 : r#"
762 : DO $$
763 : BEGIN
764 : IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
765 : EXECUTE 'GRANT pg_create_subscription TO neon_superuser';
766 : END IF;
767 : END
768 : $$;"#,
769 : ];
770 :
771 : let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
772 : client.simple_query(query)?;
773 :
774 : query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
775 : client.simple_query(query)?;
776 :
777 : query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
778 : client.simple_query(query)?;
779 :
780 : query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
781 : client.simple_query(query)?;
782 :
783 : query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
784 : client.simple_query(query)?;
785 :
786 : query = "SELECT id FROM neon_migration.migration_id";
787 : let row = client.query_one(query, &[])?;
788 : let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
789 : let starting_migration_id = current_migration;
790 :
791 : query = "BEGIN";
792 : client.simple_query(query)?;
793 :
794 : while current_migration < migrations.len() {
795 9 : info!("Running migration:\n{}\n", migrations[current_migration]);
796 : client.simple_query(migrations[current_migration])?;
797 : current_migration += 1;
798 : }
799 : let setval = format!(
800 : "UPDATE neon_migration.migration_id SET id={}",
801 : migrations.len()
802 : );
803 : client.simple_query(&setval)?;
804 :
805 : query = "COMMIT";
806 : client.simple_query(query)?;
807 :
808 4 : info!(
809 4 : "Ran {} migrations",
810 4 : (migrations.len() - starting_migration_id)
811 4 : );
812 : Ok(())
813 : }
|