LCOV - code coverage report
Current view: top level - compute_tools/src - spec.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 134 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 7 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::fs::{self, Permissions};
       3              : use std::os::unix::fs::PermissionsExt;
       4              : use std::path::Path;
       5              : 
       6              : use anyhow::{Result, anyhow, bail};
       7              : use compute_api::responses::{
       8              :     ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
       9              : };
      10              : use reqwest::StatusCode;
      11              : use tokio_postgres::Client;
      12              : use tracing::{error, info, instrument};
      13              : 
      14              : use crate::compute::ComputeNodeParams;
      15              : use crate::config;
      16              : use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
      17              : use crate::migration::MigrationRunner;
      18              : use crate::params::PG_HBA_ALL_MD5;
      19              : 
      20              : // Do control plane request and return response if any. In case of error it
      21              : // returns a bool flag indicating whether it makes sense to retry the request
      22              : // and a string with error message.
      23            0 : fn do_control_plane_request(
      24            0 :     uri: &str,
      25            0 :     jwt: &str,
      26            0 : ) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
      27            0 :     let resp = reqwest::blocking::Client::new()
      28            0 :         .get(uri)
      29            0 :         .header("Authorization", format!("Bearer {jwt}"))
      30            0 :         .send()
      31            0 :         .map_err(|e| {
      32            0 :             (
      33            0 :                 true,
      34            0 :                 format!("could not perform request to control plane: {e:?}"),
      35            0 :                 UNKNOWN_HTTP_STATUS.to_string(),
      36            0 :             )
      37            0 :         })?;
      38              : 
      39            0 :     let status = resp.status();
      40            0 :     match status {
      41            0 :         StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
      42            0 :             Ok(spec_resp) => Ok(spec_resp),
      43            0 :             Err(e) => Err((
      44            0 :                 true,
      45            0 :                 format!("could not deserialize control plane response: {e:?}"),
      46            0 :                 status.to_string(),
      47            0 :             )),
      48              :         },
      49            0 :         StatusCode::SERVICE_UNAVAILABLE => Err((
      50            0 :             true,
      51            0 :             "control plane is temporarily unavailable".to_string(),
      52            0 :             status.to_string(),
      53            0 :         )),
      54              :         StatusCode::BAD_GATEWAY => {
      55              :             // We have a problem with intermittent 502 errors now
      56              :             // https://github.com/neondatabase/cloud/issues/2353
      57              :             // It's fine to retry GET request in this case.
      58            0 :             Err((
      59            0 :                 true,
      60            0 :                 "control plane request failed with 502".to_string(),
      61            0 :                 status.to_string(),
      62            0 :             ))
      63              :         }
      64              :         // Another code, likely 500 or 404, means that compute is unknown to the control plane
      65              :         // or some internal failure happened. Doesn't make much sense to retry in this case.
      66            0 :         _ => Err((
      67            0 :             false,
      68            0 :             format!("unexpected control plane response status code: {status}"),
      69            0 :             status.to_string(),
      70            0 :         )),
      71              :     }
      72            0 : }
      73              : 
      74              : /// Request config from the control-plane by compute_id. If
      75              : /// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
      76              : /// authorization.
      77            0 : pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
      78            0 :     let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
      79            0 :     let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
      80            0 :     let mut attempt = 1;
      81              : 
      82            0 :     info!("getting config from control plane: {}", cp_uri);
      83              : 
      84              :     // Do 3 attempts to get spec from the control plane using the following logic:
      85              :     // - network error -> then retry
      86              :     // - compute id is unknown or any other error -> bail out
      87              :     // - no spec for compute yet (Empty state) -> return Ok(None)
      88              :     // - got config -> return Ok(Some(config))
      89            0 :     while attempt < 4 {
      90            0 :         let result = match do_control_plane_request(&cp_uri, &jwt) {
      91            0 :             Ok(config_resp) => {
      92            0 :                 CPLANE_REQUESTS_TOTAL
      93            0 :                     .with_label_values(&[
      94            0 :                         CPlaneRequestRPC::GetConfig.as_str(),
      95            0 :                         &StatusCode::OK.to_string(),
      96            0 :                     ])
      97            0 :                     .inc();
      98            0 :                 match config_resp.status {
      99            0 :                     ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
     100              :                     ControlPlaneComputeStatus::Attached => {
     101            0 :                         if config_resp.spec.is_some() {
     102            0 :                             Ok(config_resp.into())
     103              :                         } else {
     104            0 :                             bail!("compute is attached, but spec is empty")
     105              :                         }
     106              :                     }
     107              :                 }
     108              :             }
     109            0 :             Err((retry, msg, status)) => {
     110            0 :                 CPLANE_REQUESTS_TOTAL
     111            0 :                     .with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
     112            0 :                     .inc();
     113            0 :                 if retry {
     114            0 :                     Err(anyhow!(msg))
     115              :                 } else {
     116            0 :                     bail!(msg);
     117              :                 }
     118              :             }
     119              :         };
     120              : 
     121            0 :         if let Err(e) = &result {
     122            0 :             error!("attempt {} to get config failed with: {}", attempt, e);
     123              :         } else {
     124            0 :             return result;
     125              :         }
     126              : 
     127            0 :         attempt += 1;
     128            0 :         std::thread::sleep(std::time::Duration::from_millis(100));
     129              :     }
     130              : 
     131              :     // All attempts failed, return error.
     132            0 :     Err(anyhow::anyhow!(
     133            0 :         "Exhausted all attempts to retrieve the config from the control plane"
     134            0 :     ))
     135            0 : }
     136              : 
     137              : /// Check `pg_hba.conf` and update if needed to allow external connections.
     138            0 : pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
     139              :     // XXX: consider making it a part of config.json
     140            0 :     let pghba_path = pgdata_path.join("pg_hba.conf");
     141              : 
     142              :     // Update pg_hba to contains databricks specfic settings before adding neon settings
     143              :     // PG uses the first record that matches to perform authentication, so we need to have
     144              :     // our rules before the default ones from neon.
     145              :     // See https://www.postgresql.org/docs/current/auth-pg-hba-conf.html
     146            0 :     if let Some(databricks_pg_hba) = databricks_pg_hba {
     147            0 :         if config::line_in_file(
     148            0 :             &pghba_path,
     149            0 :             &format!("include_if_exists {}\n", *databricks_pg_hba),
     150            0 :         )? {
     151            0 :             info!("updated pg_hba.conf to include databricks_pg_hba.conf");
     152              :         } else {
     153            0 :             info!("pg_hba.conf already included databricks_pg_hba.conf");
     154              :         }
     155            0 :     }
     156              : 
     157            0 :     if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
     158            0 :         info!("updated pg_hba.conf to allow external connections");
     159              :     } else {
     160            0 :         info!("pg_hba.conf is up-to-date");
     161              :     }
     162              : 
     163            0 :     Ok(())
     164            0 : }
     165              : 
     166              : /// Check `pg_ident.conf` and update if needed to allow databricks config.
     167            0 : pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
     168            0 :     info!("checking pg_ident.conf");
     169            0 :     let pghba_path = pgdata_path.join("pg_ident.conf");
     170              : 
     171              :     // Update pg_ident to contains databricks specfic settings
     172            0 :     if let Some(databricks_pg_ident) = databricks_pg_ident {
     173            0 :         if config::line_in_file(
     174            0 :             &pghba_path,
     175            0 :             &format!("include_if_exists {}\n", *databricks_pg_ident),
     176            0 :         )? {
     177            0 :             info!("updated pg_ident.conf to include databricks_pg_ident.conf");
     178              :         } else {
     179            0 :             info!("pg_ident.conf already included databricks_pg_ident.conf");
     180              :         }
     181            0 :     }
     182              : 
     183            0 :     Ok(())
     184            0 : }
     185              : 
     186              : /// Copy tls key_file and cert_file from k8s secret mount directory
     187              : /// to pgdata and set private key file permissions as expected by Postgres.
     188              : /// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
     189              : /// K8s secrets mount on dblet does not honor permission and ownership
     190              : /// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
     191            0 : pub fn copy_tls_certificates(
     192            0 :     key_file: &String,
     193            0 :     cert_file: &String,
     194            0 :     pgdata_path: &Path,
     195            0 : ) -> Result<()> {
     196            0 :     let files = [cert_file, key_file];
     197            0 :     for file in files.iter() {
     198            0 :         let source = Path::new(file);
     199            0 :         let dest = pgdata_path.join(source.file_name().unwrap());
     200            0 :         if !dest.exists() {
     201            0 :             std::fs::copy(source, &dest)?;
     202            0 :             info!(
     203            0 :                 "Copying tls file: {} to {}",
     204            0 :                 &source.display(),
     205            0 :                 &dest.display()
     206              :             );
     207            0 :         }
     208            0 :         if *file == key_file {
     209              :             // Postgres requires private key to be readable only by the owner by having
     210              :             // chmod 600 permissions.
     211            0 :             let permissions = Permissions::from_mode(0o600);
     212            0 :             fs::set_permissions(&dest, permissions)?;
     213            0 :             info!("Setting permission on {}.", &dest.display());
     214            0 :         }
     215              :     }
     216            0 :     Ok(())
     217            0 : }
     218              : 
     219              : /// Create a standby.signal file
     220            0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
     221              :     // XXX: consider making it a part of config.json
     222            0 :     let signalfile = pgdata_path.join("standby.signal");
     223              : 
     224            0 :     if !signalfile.exists() {
     225            0 :         File::create(signalfile)?;
     226            0 :         info!("created standby.signal");
     227              :     } else {
     228            0 :         info!("reused pre-existing standby.signal");
     229              :     }
     230            0 :     Ok(())
     231            0 : }
     232              : 
     233              : #[instrument(skip_all)]
     234              : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
     235              :     let query = "ALTER EXTENSION neon UPDATE";
     236              :     info!("update neon extension version with query: {}", query);
     237              :     client.simple_query(query).await?;
     238              : 
     239              :     Ok(())
     240              : }
     241              : 
     242              : #[instrument(skip_all)]
     243              : pub async fn handle_migrations(
     244              :     params: ComputeNodeParams,
     245              :     client: &mut Client,
     246              :     lakebase_mode: bool,
     247              : ) -> Result<()> {
     248              :     info!("handle migrations");
     249              : 
     250              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     251              :     // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
     252              :     // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     253              : 
     254              :     // Add new migrations in numerical order.
     255              :     let migrations = [
     256              :         &format!(
     257              :             include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
     258              :             privileged_role_name = params.privileged_role_name
     259              :         ),
     260              :         &format!(
     261              :             include_str!("./migrations/0002-alter_roles.sql"),
     262              :             privileged_role_name = params.privileged_role_name
     263              :         ),
     264              :         &format!(
     265              :             include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
     266              :             privileged_role_name = params.privileged_role_name
     267              :         ),
     268              :         &format!(
     269              :             include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
     270              :             privileged_role_name = params.privileged_role_name
     271              :         ),
     272              :         &format!(
     273              :             include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
     274              :             privileged_role_name = params.privileged_role_name
     275              :         ),
     276              :         &format!(
     277              :             include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
     278              :             privileged_role_name = params.privileged_role_name
     279              :         ),
     280              :         &format!(
     281              :             include_str!(
     282              :                 "./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
     283              :             ),
     284              :             privileged_role_name = params.privileged_role_name
     285              :         ),
     286              :         &format!(
     287              :             include_str!(
     288              :                 "./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
     289              :             ),
     290              :             privileged_role_name = params.privileged_role_name
     291              :         ),
     292              :         include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
     293              :         &format!(
     294              :             include_str!(
     295              :                 "./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
     296              :             ),
     297              :             privileged_role_name = params.privileged_role_name
     298              :         ),
     299              :         &format!(
     300              :             include_str!(
     301              :                 "./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
     302              :             ),
     303              :             privileged_role_name = params.privileged_role_name
     304              :         ),
     305              :         &format!(
     306              :             include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
     307              :             privileged_role_name = params.privileged_role_name
     308              :         ),
     309              :     ];
     310              : 
     311              :     MigrationRunner::new(client, &migrations, lakebase_mode)
     312              :         .run_migrations()
     313              :         .await?;
     314              : 
     315              :     Ok(())
     316              : }
        

Generated by: LCOV version 2.1-beta