LCOV - code coverage report
Current view: top level - compute_tools/src - configurator.rs (source / functions) Coverage Total Hit
Test: bd421c520964a8037e429c13001c8b5f00fa42ec.info Lines: 0.0 % 13 0
Test Date: 2025-07-24 16:21:20 Functions: 0.0 % 3 0

            Line data    Source code
       1              : use std::fs::File;
       2              : use std::thread;
       3              : use std::{path::Path, sync::Arc};
       4              : 
       5              : use compute_api::responses::{ComputeConfig, ComputeStatus};
       6              : use tracing::{error, info, instrument};
       7              : 
       8              : use crate::compute::{ComputeNode, ParsedSpec};
       9              : use crate::spec::get_config_from_control_plane;
      10              : 
      11              : #[instrument(skip_all)]
      12              : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
      13              :     info!("waiting for reconfiguration requests");
      14              :     loop {
      15              :         let mut state = compute.state.lock().unwrap();
      16              : 
      17              :         if compute.params.lakebase_mode {
      18              :             while state.status != ComputeStatus::ConfigurationPending
      19              :                 && state.status != ComputeStatus::RefreshConfigurationPending
      20              :                 && state.status != ComputeStatus::Failed
      21              :             {
      22              :                 info!("configurator: compute status: {:?}, sleeping", state.status);
      23              :                 state = compute.state_changed.wait(state).unwrap();
      24              :             }
      25              :         } else {
      26              :             // We have to re-check the status after re-acquiring the lock because it could be that
      27              :             // the status has changed while we were waiting for the lock, and we might not need to
      28              :             // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
      29              :             // we are waiting for a condition variable that will never be signaled.
      30              :             if state.status != ComputeStatus::ConfigurationPending {
      31              :                 state = compute.state_changed.wait(state).unwrap();
      32              :             }
      33              :         }
      34              : 
      35              :         // Re-check the status after waking up
      36              :         if state.status == ComputeStatus::ConfigurationPending {
      37              :             info!("got configuration request");
      38              :             state.set_status(ComputeStatus::Configuration, &compute.state_changed);
      39              :             drop(state);
      40              : 
      41              :             let mut new_status = ComputeStatus::Failed;
      42              :             if let Err(e) = compute.reconfigure() {
      43              :                 error!("could not configure compute node: {}", e);
      44              :             } else {
      45              :                 new_status = ComputeStatus::Running;
      46              :                 info!("compute node configured");
      47              :             }
      48              : 
      49              :             // XXX: used to test that API is blocking
      50              :             // std::thread::sleep(std::time::Duration::from_millis(10000));
      51              : 
      52              :             compute.set_status(new_status);
      53              :         } else if state.status == ComputeStatus::RefreshConfigurationPending {
      54              :             info!(
      55              :                 "compute node suspects its configuration is out of date, now refreshing configuration"
      56              :             );
      57              :             // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
      58              :             // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
      59              :             // is safe to drop the lock like this.
      60              :             drop(state);
      61              : 
      62              :             let spec = if let Some(config_path) = &compute.params.config_path_test_only {
      63              :                 // This path is only to make testing easier. In production we always get the spec from the HCC.
      64              :                 info!(
      65              :                     "reloading config.json from path: {}",
      66              :                     config_path.to_string_lossy()
      67              :                 );
      68              :                 let path = Path::new(config_path);
      69              :                 if let Ok(file) = File::open(path) {
      70              :                     match serde_json::from_reader::<File, ComputeConfig>(file) {
      71              :                         Ok(config) => config.spec,
      72              :                         Err(e) => {
      73              :                             error!("could not parse spec file: {}", e);
      74              :                             None
      75              :                         }
      76              :                     }
      77              :                 } else {
      78              :                     error!(
      79              :                         "could not open config file at path: {}",
      80              :                         config_path.to_string_lossy()
      81              :                     );
      82              :                     None
      83              :                 }
      84              :             } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
      85              :                 match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
      86              :                     Ok(config) => config.spec,
      87              :                     Err(e) => {
      88              :                         error!("could not get config from control plane: {}", e);
      89              :                         None
      90              :                     }
      91              :                 }
      92              :             } else {
      93              :                 None
      94              :             };
      95              : 
      96              :             if let Some(spec) = spec {
      97              :                 if let Ok(pspec) = ParsedSpec::try_from(spec) {
      98              :                     {
      99              :                         let mut state = compute.state.lock().unwrap();
     100              :                         // Defensive programming to make sure this thread is indeed the only one that can move the compute
     101              :                         // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
     102              :                         // into the type system.
     103              :                         assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
     104              : 
     105            0 :                         if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
     106              :                             == Some(pspec.pageserver_connstr.clone())
     107              :                         {
     108              :                             info!(
     109              :                                 "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
     110              :                             );
     111              :                             state.status = ComputeStatus::Running;
     112              :                             compute.state_changed.notify_all();
     113              :                             drop(state);
     114              :                             std::thread::sleep(std::time::Duration::from_secs(5));
     115              :                             continue;
     116              :                         }
     117              :                         // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
     118              :                         // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
     119              :                         // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
     120              :                         // but it's not worth forking the codebase too much for this minor point alone right now.
     121              :                         state.pspec = Some(pspec);
     122              :                     }
     123              :                     match compute.reconfigure() {
     124              :                         Ok(_) => {
     125              :                             info!("Refresh configuration: compute node configured");
     126              :                             compute.set_status(ComputeStatus::Running);
     127              :                         }
     128              :                         Err(e) => {
     129              :                             error!(
     130              :                                 "Refresh configuration: could not configure compute node: {}",
     131              :                                 e
     132              :                             );
     133              :                             // Leave the compute node in the `RefreshConfigurationPending` state if the configuration
     134              :                             // was not successful. It should be okay to treat this situation the same as if the loop
     135              :                             // hasn't executed yet as long as the detection side keeps notifying.
     136              :                         }
     137              :                     }
     138              :                 }
     139              :             }
     140              :         } else if state.status == ComputeStatus::Failed {
     141              :             info!("compute node is now in Failed state, exiting");
     142              :             break;
     143              :         } else {
     144              :             info!("woken up for compute status: {:?}, sleeping", state.status);
     145              :         }
     146              :     }
     147              : }
     148              : 
     149            0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     150            0 :     let compute = Arc::clone(compute);
     151              : 
     152            0 :     let runtime = tokio::runtime::Handle::current();
     153              : 
     154            0 :     thread::Builder::new()
     155            0 :         .name("compute-configurator".into())
     156            0 :         .spawn(move || {
     157            0 :             let _rt_guard = runtime.enter();
     158            0 :             configurator_main_loop(&compute);
     159            0 :             info!("configurator thread is exited");
     160            0 :         })
     161            0 :         .expect("cannot launch configurator thread")
     162            0 : }
        

Generated by: LCOV version 2.1-beta