LCOV - code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 90 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 5 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::path::Path;
       3              : 
       4              : use anyhow::{Result, anyhow, bail};
       5              : use compute_api::responses::{
       6              :     ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
       7              : };
       8              : use reqwest::StatusCode;
       9              : use tokio_postgres::Client;
      10              : use tracing::{error, info, instrument};
      11              : 
      12              : use crate::compute::ComputeNodeParams;
      13              : use crate::config;
      14              : use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
      15              : use crate::migration::MigrationRunner;
      16              : use crate::params::PG_HBA_ALL_MD5;
      17              : 
      18              : // Do control plane request and return response if any. In case of error it
      19              : // returns a bool flag indicating whether it makes sense to retry the request
      20              : // and a string with error message.
      21            0 : fn do_control_plane_request(
      22            0 :     uri: &str,
      23            0 :     jwt: &str,
      24            0 : ) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
      25            0 :     let resp = reqwest::blocking::Client::new()
      26            0 :         .get(uri)
      27            0 :         .header("Authorization", format!("Bearer {jwt}"))
      28            0 :         .send()
      29            0 :         .map_err(|e| {
      30            0 :             (
      31            0 :                 true,
      32            0 :                 format!("could not perform request to control plane: {e:?}"),
      33            0 :                 UNKNOWN_HTTP_STATUS.to_string(),
      34            0 :             )
      35            0 :         })?;
      36              : 
      37            0 :     let status = resp.status();
      38            0 :     match status {
      39            0 :         StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
      40            0 :             Ok(spec_resp) => Ok(spec_resp),
      41            0 :             Err(e) => Err((
      42            0 :                 true,
      43            0 :                 format!("could not deserialize control plane response: {e:?}"),
      44            0 :                 status.to_string(),
      45            0 :             )),
      46              :         },
      47            0 :         StatusCode::SERVICE_UNAVAILABLE => Err((
      48            0 :             true,
      49            0 :             "control plane is temporarily unavailable".to_string(),
      50            0 :             status.to_string(),
      51            0 :         )),
      52              :         StatusCode::BAD_GATEWAY => {
      53              :             // We have a problem with intermittent 502 errors now
      54              :             // https://github.com/neondatabase/cloud/issues/2353
      55              :             // It's fine to retry GET request in this case.
      56            0 :             Err((
      57            0 :                 true,
      58            0 :                 "control plane request failed with 502".to_string(),
      59            0 :                 status.to_string(),
      60            0 :             ))
      61              :         }
      62              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      63              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      64            0 :         _ => Err((
      65            0 :             false,
      66            0 :             format!("unexpected control plane response status code: {status}"),
      67            0 :             status.to_string(),
      68            0 :         )),
      69              :     }
      70            0 : }
      71              : 
      72              : /// Request config from the control-plane by compute_id. If
      73              : /// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
      74              : /// authorization.
      75            0 : pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
      76            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      77            0 :     let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
      78            0 :     let mut attempt = 1;
      79              : 
      80            0 :     info!("getting config from control plane: {}", cp_uri);
      81              : 
      82              :     // Do 3 attempts to get spec from the control plane using the following logic:
      83              :     // - network error -> then retry
      84              :     // - compute id is unknown or any other error -> bail out
      85              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      86              :     // - got config -> return Ok(Some(config))
      87            0 :     while attempt < 4 {
      88            0 :         let result = match do_control_plane_request(&cp_uri, &jwt) {
      89            0 :             Ok(config_resp) => {
      90            0 :                 CPLANE_REQUESTS_TOTAL
      91            0 :                     .with_label_values(&[
      92            0 :                         CPlaneRequestRPC::GetConfig.as_str(),
      93            0 :                         &StatusCode::OK.to_string(),
      94            0 :                     ])
      95            0 :                     .inc();
      96            0 :                 match config_resp.status {
      97            0 :                     ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
      98              :                     ControlPlaneComputeStatus::Attached => {
      99            0 :                         if config_resp.spec.is_some() {
     100            0 :                             Ok(config_resp.into())
     101              :                         } else {
     102            0 :                             bail!("compute is attached, but spec is empty")
     103              :                         }
     104              :                     }
     105              :                 }
     106              :             }
     107            0 :             Err((retry, msg, status)) => {
     108            0 :                 CPLANE_REQUESTS_TOTAL
     109            0 :                     .with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
     110            0 :                     .inc();
     111            0 :                 if retry {
     112            0 :                     Err(anyhow!(msg))
     113              :                 } else {
     114            0 :                     bail!(msg);
     115              :                 }
     116              :             }
     117              :         };
     118              : 
     119            0 :         if let Err(e) = &result {
     120            0 :             error!("attempt {} to get config failed with: {}", attempt, e);
     121              :         } else {
     122            0 :             return result;
     123              :         }
     124              : 
     125            0 :         attempt += 1;
     126            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     127              :     }
     128              : 
     129              :     // All attempts failed, return error.
     130            0 :     Err(anyhow::anyhow!(
     131            0 :         "Exhausted all attempts to retrieve the config from the control plane"
     132            0 :     ))
     133            0 : }
     134              : 
     135              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     136            0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
     137              :     // XXX: consider making it a part of config.json
     138            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     139              : 
     140            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     141            0 :         info!("updated pg_hba.conf to allow external connections");
     142              :     } else {
     143            0 :         info!("pg_hba.conf is up-to-date");
     144              :     }
     145              : 
     146            0 :     Ok(())
     147            0 : }
     148              : 
     149              : /// Create a standby.signal file
     150            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     151              :     // XXX: consider making it a part of config.json
     152            0 :     let signalfile = pgdata_path.join("standby.signal");
     153              : 
     154            0 :     if !signalfile.exists() {
     155            0 :         File::create(signalfile)?;
     156            0 :         info!("created standby.signal");
     157              :     } else {
     158            0 :         info!("reused pre-existing standby.signal");
     159              :     }
     160            0 :     Ok(())
     161            0 : }
     162              : 
     163              : #[instrument(skip_all)]
     164              : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     165              :     let query = "ALTER EXTENSION neon UPDATE";
     166              :     info!("update neon extension version with query: {}", query);
     167              :     client.simple_query(query).await?;
     168              : 
     169              :     Ok(())
     170              : }
     171              : 
     172              : #[instrument(skip_all)]
     173              : pub async fn handle_migrations(params: ComputeNodeParams, client: &mut Client) -> Result<()> {
     174              :     info!("handle migrations");
     175              : 
     176              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     177              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     178              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     179              : 
     180              :     // Add new migrations in numerical order.
     181              :     let migrations = [
     182              :         &format!(
     183              :             include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
     184              :             privileged_role_name = params.privileged_role_name
     185              :         ),
     186              :         &format!(
     187              :             include_str!("./migrations/0002-alter_roles.sql"),
     188              :             privileged_role_name = params.privileged_role_name
     189              :         ),
     190              :         &format!(
     191              :             include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
     192              :             privileged_role_name = params.privileged_role_name
     193              :         ),
     194              :         &format!(
     195              :             include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
     196              :             privileged_role_name = params.privileged_role_name
     197              :         ),
     198              :         &format!(
     199              :             include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
     200              :             privileged_role_name = params.privileged_role_name
     201              :         ),
     202              :         &format!(
     203              :             include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
     204              :             privileged_role_name = params.privileged_role_name
     205              :         ),
     206              :         &format!(
     207              :             include_str!(
     208              :                 "./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
     209              :             ),
     210              :             privileged_role_name = params.privileged_role_name
     211              :         ),
     212              :         &format!(
     213              :             include_str!(
     214              :                 "./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
     215              :             ),
     216              :             privileged_role_name = params.privileged_role_name
     217              :         ),
     218              :         include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
     219              :         &format!(
     220              :             include_str!(
     221              :                 "./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
     222              :             ),
     223              :             privileged_role_name = params.privileged_role_name
     224              :         ),
     225              :         &format!(
     226              :             include_str!(
     227              :                 "./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
     228              :             ),
     229              :             privileged_role_name = params.privileged_role_name
     230              :         ),
     231              :         &format!(
     232              :             include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
     233              :             privileged_role_name = params.privileged_role_name
     234              :         ),
     235              :     ];
     236              : 
     237              :     MigrationRunner::new(client, &migrations)
     238              :         .run_migrations()
     239              :         .await?;
     240              : 
     241              :     Ok(())
     242              : }
        

Generated by: LCOV version 2.1-beta