LCOV - code coverage report
Current view: top level - compute_tools/src - configurator.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 20 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 4 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::thread;
       3              : use std::{path::Path, sync::Arc};
       4              : 
       5              : use anyhow::Result;
       6              : use compute_api::responses::{ComputeConfig, ComputeStatus};
       7              : use tracing::{error, info, instrument};
       8              : 
       9              : use crate::compute::{ComputeNode, ParsedSpec};
      10              : use crate::spec::get_config_from_control_plane;
      11              : 
      12              : #[instrument(skip_all)]
      13              : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
      14              :     info!("waiting for reconfiguration requests");
      15              :     loop {
      16              :         let mut state = compute.state.lock().unwrap();
      17              :         /* BEGIN_HADRON */
      18              :         // RefreshConfiguration should only be used inside the loop
      19              :         assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
      20              :         /* END_HADRON */
      21              : 
      22              :         if compute.params.lakebase_mode {
      23              :             while state.status != ComputeStatus::ConfigurationPending
      24              :                 && state.status != ComputeStatus::RefreshConfigurationPending
      25              :                 && state.status != ComputeStatus::Failed
      26              :             {
      27              :                 info!("configurator: compute status: {:?}, sleeping", state.status);
      28              :                 state = compute.state_changed.wait(state).unwrap();
      29              :             }
      30              :         } else {
      31              :             // We have to re-check the status after re-acquiring the lock because it could be that
      32              :             // the status has changed while we were waiting for the lock, and we might not need to
      33              :             // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
      34              :             // we are waiting for a condition variable that will never be signaled.
      35              :             if state.status != ComputeStatus::ConfigurationPending {
      36              :                 state = compute.state_changed.wait(state).unwrap();
      37              :             }
      38              :         }
      39              : 
      40              :         // Re-check the status after waking up
      41              :         if state.status == ComputeStatus::ConfigurationPending {
      42              :             info!("got configuration request");
      43              :             state.set_status(ComputeStatus::Configuration, &compute.state_changed);
      44              :             drop(state);
      45              : 
      46              :             let mut new_status = ComputeStatus::Failed;
      47              :             if let Err(e) = compute.reconfigure() {
      48              :                 error!("could not configure compute node: {}", e);
      49              :             } else {
      50              :                 new_status = ComputeStatus::Running;
      51              :                 info!("compute node configured");
      52              :             }
      53              : 
      54              :             // XXX: used to test that API is blocking
      55              :             // std::thread::sleep(std::time::Duration::from_millis(10000));
      56              : 
      57              :             compute.set_status(new_status);
      58              :         } else if state.status == ComputeStatus::RefreshConfigurationPending {
      59              :             info!(
      60              :                 "compute node suspects its configuration is out of date, now refreshing configuration"
      61              :             );
      62              :             state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
      63              :             // Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
      64              :             // This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
      65              :             // is safe to drop the lock like this.
      66              :             drop(state);
      67              : 
      68              :             let get_config_result: anyhow::Result<ComputeConfig> =
      69              :                 if let Some(config_path) = &compute.params.config_path_test_only {
      70              :                     // This path is only to make testing easier. In production we always get the config from the HCC.
      71              :                     info!(
      72              :                         "reloading config.json from path: {}",
      73              :                         config_path.to_string_lossy()
      74              :                     );
      75              :                     let path = Path::new(config_path);
      76              :                     if let Ok(file) = File::open(path) {
      77              :                         match serde_json::from_reader::<File, ComputeConfig>(file) {
      78              :                             Ok(config) => Ok(config),
      79              :                             Err(e) => {
      80              :                                 error!("could not parse config file: {}", e);
      81              :                                 Err(anyhow::anyhow!("could not parse config file: {}", e))
      82              :                             }
      83              :                         }
      84              :                     } else {
      85              :                         error!(
      86              :                             "could not open config file at path: {:?}",
      87              :                             config_path.to_string_lossy()
      88              :                         );
      89              :                         Err(anyhow::anyhow!(
      90              :                             "could not open config file at path: {}",
      91              :                             config_path.to_string_lossy()
      92              :                         ))
      93              :                     }
      94              :                 } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
      95              :                     get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
      96              :                 } else {
      97              :                     Err(anyhow::anyhow!("config_path_test_only is not set"))
      98              :                 };
      99              : 
     100              :             // Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
     101              :             let parsed_spec_result: Result<Option<ParsedSpec>> =
     102            0 :                 get_config_result.and_then(|config| {
     103            0 :                     if let Some(spec) = config.spec {
     104            0 :                         if let Ok(pspec) = ParsedSpec::try_from(spec) {
     105            0 :                             Ok(Some(pspec))
     106              :                         } else {
     107            0 :                             Err(anyhow::anyhow!("could not parse spec"))
     108              :                         }
     109              :                     } else {
     110            0 :                         Ok(None)
     111              :                     }
     112            0 :                 });
     113              : 
     114              :             let new_status: ComputeStatus;
     115              :             match parsed_spec_result {
     116              :                 // Control plane (HCM) returned a spec and we were able to parse it.
     117              :                 Ok(Some(pspec)) => {
     118              :                     {
     119              :                         let mut state = compute.state.lock().unwrap();
     120              :                         // Defensive programming to make sure this thread is indeed the only one that can move the compute
     121              :                         // node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
     122              :                         // into the type system.
     123              :                         assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
     124              : 
     125            0 :                         if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
     126              :                             == Some(pspec.pageserver_connstr.clone())
     127              :                         {
     128              :                             info!(
     129              :                                 "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
     130              :                             );
     131              :                             state.status = ComputeStatus::Running;
     132              :                             compute.state_changed.notify_all();
     133              :                             drop(state);
     134              :                             std::thread::sleep(std::time::Duration::from_secs(5));
     135              :                             continue;
     136              :                         }
     137              :                         // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
     138              :                         // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
     139              :                         // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
     140              :                         // but it's not worth forking the codebase too much for this minor point alone right now.
     141              :                         state.pspec = Some(pspec);
     142              :                     }
     143              :                     match compute.reconfigure() {
     144              :                         Ok(_) => {
     145              :                             info!("Refresh configuration: compute node configured");
     146              :                             new_status = ComputeStatus::Running;
     147              :                         }
     148              :                         Err(e) => {
     149              :                             error!(
     150              :                                 "Refresh configuration: could not configure compute node: {}",
     151              :                                 e
     152              :                             );
     153              :                             // Set the compute node back to the `RefreshConfigurationPending` state if the configuration
     154              :                             // was not successful. It should be okay to treat this situation the same as if the loop
     155              :                             // hasn't executed yet as long as the detection side keeps notifying.
     156              :                             new_status = ComputeStatus::RefreshConfigurationPending;
     157              :                         }
     158              :                     }
     159              :                 }
     160              :                 // Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
     161              :                 Ok(None) => {
     162              :                     info!(
     163              :                         "Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
     164              :                     );
     165              :                     // We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
     166              :                     // clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
     167              :                     // configuration state).
     168              :                     std::process::exit(1);
     169              :                 }
     170              :                 // Various error cases:
     171              :                 // - The request to the control plane (HCM) either failed or returned a malformed spec.
     172              :                 // - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
     173              :                 Err(e) => {
     174              :                     error!(
     175              :                         "Refresh configuration: error getting a parsed spec: {:?}",
     176              :                         e
     177              :                     );
     178              :                     new_status = ComputeStatus::RefreshConfigurationPending;
     179              :                     // We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
     180              :                     // retrying to avoid hammering the HCM.
     181              :                     std::thread::sleep(std::time::Duration::from_secs(5));
     182              :                 }
     183              :             }
     184              :             compute.set_status(new_status);
     185              :         } else if state.status == ComputeStatus::Failed {
     186              :             info!("compute node is now in Failed state, exiting");
     187              :             break;
     188              :         } else {
     189              :             info!("woken up for compute status: {:?}, sleeping", state.status);
     190              :         }
     191              :     }
     192              : }
     193              : 
     194            0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     195            0 :     let compute = Arc::clone(compute);
     196              : 
     197            0 :     let runtime = tokio::runtime::Handle::current();
     198              : 
     199            0 :     thread::Builder::new()
     200            0 :         .name("compute-configurator".into())
     201            0 :         .spawn(move || {
     202            0 :             let _rt_guard = runtime.enter();
     203            0 :             configurator_main_loop(&compute);
     204            0 :             info!("configurator thread is exited");
     205            0 :         })
     206            0 :         .expect("cannot launch configurator thread")
     207            0 : }
        

Generated by: LCOV version 2.1-beta