LCOV - code coverage report
Current view: top level - compute_tools/src - configurator.rs (source / functions) Coverage Total Hit
Test: 5df5be44b39c188c09003ca1996842ae19a48b34.info Lines: 0.0 % 12 0
Test Date: 2025-07-24 01:13:26 Functions: 0.0 % 2 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::thread;
       3              : use std::{path::Path, sync::Arc};
       4              : 
       5              : use compute_api::responses::{ComputeConfig, ComputeStatus};
       6              : use tracing::{error, info, instrument};
       7              : 
       8              : use crate::compute::{ComputeNode, ParsedSpec};
       9              : use crate::spec::get_config_from_control_plane;
      10              : 
      11              : #[instrument(skip_all)]
      12              : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
      13              :     info!("waiting for reconfiguration requests");
      14              :     loop {
      15              :         let mut state = compute.state.lock().unwrap();
      16              : 
      17              :         if compute.params.lakebase_mode {
      18              :             while state.status != ComputeStatus::ConfigurationPending
      19              :                 && state.status != ComputeStatus::RefreshConfigurationPending
      20              :                 && state.status != ComputeStatus::Failed
      21              :             {
      22              :                 info!("configurator: compute status: {:?}, sleeping", state.status);
      23              :                 state = compute.state_changed.wait(state).unwrap();
      24              :             }
      25              :         } else {
      26              :             // We have to re-check the status after re-acquiring the lock because it could be that
      27              :             // the status has changed while we were waiting for the lock, and we might not need to
      28              :             // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
      29              :             // we are waiting for a condition variable that will never be signaled.
      30              :             if state.status != ComputeStatus::ConfigurationPending {
      31              :                 state = compute.state_changed.wait(state).unwrap();
      32              :             }
      33              :         }
      34              : 
      35              :         // Re-check the status after waking up
      36              :         if state.status == ComputeStatus::ConfigurationPending {
      37              :             info!("got configuration request");
      38              :             state.set_status(ComputeStatus::Configuration, &compute.state_changed);
      39              :             drop(state);
      40              : 
      41              :             let mut new_status = ComputeStatus::Failed;
      42              :             if let Err(e) = compute.reconfigure() {
      43              :                 error!("could not configure compute node: {}", e);
      44              :             } else {
      45              :                 new_status = ComputeStatus::Running;
      46              :                 info!("compute node configured");
      47              :             }
      48              : 
      49              :             // XXX: used to test that API is blocking
      50              :             // std::thread::sleep(std::time::Duration::from_millis(10000));
      51              : 
      52              :             compute.set_status(new_status);
      53              :         } else if state.status == ComputeStatus::RefreshConfigurationPending {
      54              :             info!(
      55              :                 "compute node suspects its configuration is out of date, now refreshing configuration"
      56              :             );
      57              :             // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
      58              :             // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
      59              :             // is safe to drop the lock like this.
      60              :             drop(state);
      61              : 
      62              :             let spec = if let Some(config_path) = &compute.params.config_path_test_only {
      63              :                 // This path is only to make testing easier. In production we always get the spec from the HCC.
      64              :                 info!(
      65              :                     "reloading config.json from path: {}",
      66              :                     config_path.to_string_lossy()
      67              :                 );
      68              :                 let path = Path::new(config_path);
      69              :                 if let Ok(file) = File::open(path) {
      70              :                     match serde_json::from_reader::<File, ComputeConfig>(file) {
      71              :                         Ok(config) => config.spec,
      72              :                         Err(e) => {
      73              :                             error!("could not parse spec file: {}", e);
      74              :                             None
      75              :                         }
      76              :                     }
      77              :                 } else {
      78              :                     error!(
      79              :                         "could not open config file at path: {}",
      80              :                         config_path.to_string_lossy()
      81              :                     );
      82              :                     None
      83              :                 }
      84              :             } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
      85              :                 match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
      86              :                     Ok(config) => config.spec,
      87              :                     Err(e) => {
      88              :                         error!("could not get config from control plane: {}", e);
      89              :                         None
      90              :                     }
      91              :                 }
      92              :             } else {
      93              :                 None
      94              :             };
      95              : 
      96              :             if let Some(spec) = spec {
      97              :                 if let Ok(pspec) = ParsedSpec::try_from(spec) {
      98              :                     {
      99              :                         let mut state = compute.state.lock().unwrap();
     100              :                         // Defensive programming to make sure this thread is indeed the only one that can move the compute
     101              :                         // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
     102              :                         // into the type system.
     103              :                         assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
     104              :                         // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
     105              :                         // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
     106              :                         // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
     107              :                         // but it's not worth forking the codebase too much for this minor point alone right now.
     108              :                         state.pspec = Some(pspec);
     109              :                     }
     110              :                     match compute.reconfigure() {
     111              :                         Ok(_) => {
     112              :                             info!("Refresh configuration: compute node configured");
     113              :                             compute.set_status(ComputeStatus::Running);
     114              :                         }
     115              :                         Err(e) => {
     116              :                             error!(
     117              :                                 "Refresh configuration: could not configure compute node: {}",
     118              :                                 e
     119              :                             );
     120              :                             // Leave the compute node in the `RefreshConfigurationPending` state if the configuration
     121              :                             // was not successful. It should be okay to treat this situation the same as if the loop
     122              :                             // hasn't executed yet as long as the detection side keeps notifying.
     123              :                         }
     124              :                     }
     125              :                 }
     126              :             }
     127              :         } else if state.status == ComputeStatus::Failed {
     128              :             info!("compute node is now in Failed state, exiting");
     129              :             break;
     130              :         } else {
     131              :             info!("woken up for compute status: {:?}, sleeping", state.status);
     132              :         }
     133              :     }
     134              : }
     135              : 
     136            0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     137            0 :     let compute = Arc::clone(compute);
     138              : 
     139            0 :     let runtime = tokio::runtime::Handle::current();
     140              : 
     141            0 :     thread::Builder::new()
     142            0 :         .name("compute-configurator".into())
     143            0 :         .spawn(move || {
     144            0 :             let _rt_guard = runtime.enter();
     145            0 :             configurator_main_loop(&compute);
     146            0 :             info!("configurator thread is exited");
     147            0 :         })
     148            0 :         .expect("cannot launch configurator thread")
     149            0 : }
        

Generated by: LCOV version 2.1-beta