|             Line data    Source code 
       1              : use anyhow::{anyhow, bail, Result};
       2              : use reqwest::StatusCode;
       3              : use std::fs::File;
       4              : use std::path::Path;
       5              : use tokio_postgres::Client;
       6              : use tracing::{error, info, instrument, warn};
       7              : 
       8              : use crate::config;
       9              : use crate::metrics::{CPlaneRequestRPC, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
      10              : use crate::migration::MigrationRunner;
      11              : use crate::params::PG_HBA_ALL_MD5;
      12              : use crate::pg_helpers::*;
      13              : 
      14              : use compute_api::responses::{
      15              :     ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
      16              : };
      17              : use compute_api::spec::ComputeSpec;
      18              : 
      19              : // Do control plane request and return response if any. In case of error it
      20              : // returns a bool flag indicating whether it makes sense to retry the request
      21              : // and a string with error message.
      22            0 : fn do_control_plane_request(
      23            0 :     uri: &str,
      24            0 :     jwt: &str,
      25            0 : ) -> Result<ControlPlaneSpecResponse, (bool, String, String)> {
      26            0 :     let resp = reqwest::blocking::Client::new()
      27            0 :         .get(uri)
      28            0 :         .header("Authorization", format!("Bearer {}", jwt))
      29            0 :         .send()
      30            0 :         .map_err(|e| {
      31            0 :             (
      32            0 :                 true,
      33            0 :                 format!("could not perform spec request to control plane: {:?}", e),
      34            0 :                 UNKNOWN_HTTP_STATUS.to_string(),
      35            0 :             )
      36            0 :         })?;
      37              : 
      38            0 :     let status = resp.status();
      39            0 :     match status {
      40            0 :         StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
      41            0 :             Ok(spec_resp) => Ok(spec_resp),
      42            0 :             Err(e) => Err((
      43            0 :                 true,
      44            0 :                 format!("could not deserialize control plane response: {:?}", e),
      45            0 :                 status.to_string(),
      46            0 :             )),
      47              :         },
      48            0 :         StatusCode::SERVICE_UNAVAILABLE => Err((
      49            0 :             true,
      50            0 :             "control plane is temporarily unavailable".to_string(),
      51            0 :             status.to_string(),
      52            0 :         )),
      53              :         StatusCode::BAD_GATEWAY => {
      54              :             // We have a problem with intermittent 502 errors now
      55              :             // https://github.com/neondatabase/cloud/issues/2353
      56              :             // It's fine to retry GET request in this case.
      57            0 :             Err((
      58            0 :                 true,
      59            0 :                 "control plane request failed with 502".to_string(),
      60            0 :                 status.to_string(),
      61            0 :             ))
      62              :         }
      63              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      64              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      65            0 :         _ => Err((
      66            0 :             false,
      67            0 :             format!("unexpected control plane response status code: {}", status),
      68            0 :             status.to_string(),
      69            0 :         )),
      70              :     }
      71            0 : }
      72              : 
      73              : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
      74              : /// env variable is set, it will be used for authorization.
      75            0 : pub fn get_spec_from_control_plane(
      76            0 :     base_uri: &str,
      77            0 :     compute_id: &str,
      78            0 : ) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
      79            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      80            0 :     let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
      81            0 :         Ok(v) => v,
      82            0 :         Err(_) => "".to_string(),
      83              :     };
      84            0 :     let mut attempt = 1;
      85            0 : 
      86            0 :     info!("getting spec from control plane: {}", cp_uri);
      87              : 
      88              :     // Do 3 attempts to get spec from the control plane using the following logic:
      89              :     // - network error -> then retry
      90              :     // - compute id is unknown or any other error -> bail out
      91              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      92              :     // - got spec -> return Ok(Some(spec))
      93            0 :     while attempt < 4 {
      94            0 :         let result = match do_control_plane_request(&cp_uri, &jwt) {
      95            0 :             Ok(spec_resp) => {
      96            0 :                 CPLANE_REQUESTS_TOTAL
      97            0 :                     .with_label_values(&[
      98            0 :                         CPlaneRequestRPC::GetSpec.as_str(),
      99            0 :                         &StatusCode::OK.to_string(),
     100            0 :                     ])
     101            0 :                     .inc();
     102            0 :                 match spec_resp.status {
     103            0 :                     ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
     104              :                     ControlPlaneComputeStatus::Attached => {
     105            0 :                         if let Some(spec) = spec_resp.spec {
     106            0 :                             Ok((Some(spec), spec_resp.compute_ctl_config))
     107              :                         } else {
     108            0 :                             bail!("compute is attached, but spec is empty")
     109              :                         }
     110              :                     }
     111              :                 }
     112              :             }
     113            0 :             Err((retry, msg, status)) => {
     114            0 :                 CPLANE_REQUESTS_TOTAL
     115            0 :                     .with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status])
     116            0 :                     .inc();
     117            0 :                 if retry {
     118            0 :                     Err(anyhow!(msg))
     119              :                 } else {
     120            0 :                     bail!(msg);
     121              :                 }
     122              :             }
     123              :         };
     124              : 
     125            0 :         if let Err(e) = &result {
     126            0 :             error!("attempt {} to get spec failed with: {}", attempt, e);
     127              :         } else {
     128            0 :             return result;
     129              :         }
     130              : 
     131            0 :         attempt += 1;
     132            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     133              :     }
     134              : 
     135              :     // All attempts failed, return error.
     136            0 :     Err(anyhow::anyhow!(
     137            0 :         "Exhausted all attempts to retrieve the spec from the control plane"
     138            0 :     ))
     139            0 : }
     140              : 
     141              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     142            0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     143            0 :     // XXX: consider making it a part of spec.json
     144            0 :     info!("checking pg_hba.conf");
     145            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     146            0 : 
     147            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     148            0 :         info!("updated pg_hba.conf to allow external connections");
     149              :     } else {
     150            0 :         info!("pg_hba.conf is up-to-date");
     151              :     }
     152              : 
     153            0 :     Ok(())
     154            0 : }
     155              : 
     156              : /// Create a standby.signal file
     157            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     158            0 :     // XXX: consider making it a part of spec.json
     159            0 :     info!("adding standby.signal");
     160            0 :     let signalfile = pgdata_path.join("standby.signal");
     161            0 : 
     162            0 :     if !signalfile.exists() {
     163            0 :         info!("created standby.signal");
     164            0 :         File::create(signalfile)?;
     165              :     } else {
     166            0 :         info!("reused pre-existing standby.signal");
     167              :     }
     168            0 :     Ok(())
     169            0 : }
     170              : 
     171              : #[instrument(skip_all)]
     172              : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     173              :     info!("handle neon extension upgrade");
     174              :     let query = "ALTER EXTENSION neon UPDATE";
     175              :     info!("update neon extension version with query: {}", query);
     176              :     client.simple_query(query).await?;
     177              : 
     178              :     Ok(())
     179              : }
     180              : 
     181              : #[instrument(skip_all)]
     182              : pub async fn handle_migrations(client: &mut Client) -> Result<()> {
     183              :     info!("handle migrations");
     184              : 
     185              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     186              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     187              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     188              : 
     189              :     // Add new migrations in numerical order.
     190              :     let migrations = [
     191              :         include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
     192              :         include_str!("./migrations/0002-alter_roles.sql"),
     193              :         include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
     194              :         include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
     195              :         include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
     196              :         include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
     197              :         include_str!(
     198              :             "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
     199              :         ),
     200              :         include_str!(
     201              :             "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
     202              :         ),
     203              :         include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
     204              :         include_str!(
     205              :             "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
     206              :         ),
     207              :         include_str!(
     208              :             "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
     209              :         ),
     210              :     ];
     211              : 
     212              :     MigrationRunner::new(client, &migrations)
     213              :         .run_migrations()
     214              :         .await?;
     215              : 
     216              :     Ok(())
     217              : }
     218              : 
     219              : /// Connect to the database as superuser and pre-create anon extension
     220              : /// if it is present in shared_preload_libraries
     221              : #[instrument(skip_all)]
     222              : pub async fn handle_extension_anon(
     223              :     spec: &ComputeSpec,
     224              :     db_owner: &str,
     225              :     db_client: &mut Client,
     226              :     grants_only: bool,
     227              : ) -> Result<()> {
     228              :     info!("handle extension anon");
     229              : 
     230              :     if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
     231              :         if libs.contains("anon") {
     232              :             if !grants_only {
     233              :                 // check if extension is already initialized using anon.is_initialized()
     234              :                 let query = "SELECT anon.is_initialized()";
     235              :                 match db_client.query(query, &[]).await {
     236              :                     Ok(rows) => {
     237              :                         if !rows.is_empty() {
     238              :                             let is_initialized: bool = rows[0].get(0);
     239              :                             if is_initialized {
     240              :                                 info!("anon extension is already initialized");
     241              :                                 return Ok(());
     242              :                             }
     243              :                         }
     244              :                     }
     245              :                     Err(e) => {
     246              :                         warn!(
     247              :                             "anon extension is_installed check failed with expected error: {}",
     248              :                             e
     249              :                         );
     250              :                     }
     251              :                 };
     252              : 
     253              :                 // Create anon extension if this compute needs it
     254              :                 // Users cannot create it themselves, because superuser is required.
     255              :                 let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
     256              :                 info!("creating anon extension with query: {}", query);
     257              :                 match db_client.query(query, &[]).await {
     258              :                     Ok(_) => {}
     259              :                     Err(e) => {
     260              :                         error!("anon extension creation failed with error: {}", e);
     261              :                         return Ok(());
     262              :                     }
     263              :                 }
     264              : 
     265              :                 // check that extension is installed
     266              :                 query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     267              :                 let rows = db_client.query(query, &[]).await?;
     268              :                 if rows.is_empty() {
     269              :                     error!("anon extension is not installed");
     270              :                     return Ok(());
     271              :                 }
     272              : 
     273              :                 // Initialize anon extension
     274              :                 // This also requires superuser privileges, so users cannot do it themselves.
     275              :                 query = "SELECT anon.init()";
     276              :                 match db_client.query(query, &[]).await {
     277              :                     Ok(_) => {}
     278              :                     Err(e) => {
     279              :                         error!("anon.init() failed with error: {}", e);
     280              :                         return Ok(());
     281              :                     }
     282              :                 }
     283              :             }
     284              : 
     285              :             // check that extension is installed, if not bail early
     286              :             let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
     287              :             match db_client.query(query, &[]).await {
     288              :                 Ok(rows) => {
     289              :                     if rows.is_empty() {
     290              :                         error!("anon extension is not installed");
     291              :                         return Ok(());
     292              :                     }
     293              :                 }
     294              :                 Err(e) => {
     295              :                     error!("anon extension check failed with error: {}", e);
     296              :                     return Ok(());
     297              :                 }
     298              :             };
     299              : 
     300              :             let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
     301              :             info!("granting anon extension permissions with query: {}", query);
     302              :             db_client.simple_query(&query).await?;
     303              : 
     304              :             // Grant permissions to db_owner to use anon extension functions
     305              :             let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
     306              :             info!("granting anon extension permissions with query: {}", query);
     307              :             db_client.simple_query(&query).await?;
     308              : 
     309              :             // This is needed, because some functions are defined as SECURITY DEFINER.
     310              :             // In Postgres SECURITY DEFINER functions are executed with the privileges
     311              :             // of the owner.
     312              :             // In anon extension this it is needed to access some GUCs, which are only accessible to
     313              :             // superuser. But we've patched postgres to allow db_owner to access them as well.
     314              :             // So we need to change owner of these functions to db_owner.
     315              :             let query = format!("
     316              :                 SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
     317              :                 from pg_proc p
     318              :                 join pg_namespace nsp ON p.pronamespace = nsp.oid
     319              :                 where nsp.nspname = 'anon';", db_owner);
     320              : 
     321              :             info!("change anon extension functions owner to db owner");
     322              :             db_client.simple_query(&query).await?;
     323              : 
     324              :             //  affects views as well
     325              :             let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
     326              :             info!("granting anon extension permissions with query: {}", query);
     327              :             db_client.simple_query(&query).await?;
     328              : 
     329              :             let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
     330              :             info!("granting anon extension permissions with query: {}", query);
     331              :             db_client.simple_query(&query).await?;
     332              :         }
     333              :     }
     334              : 
     335              :     Ok(())
     336              : }
         |