LCOV - code coverage report
Current view: top level - compute_tools/src/http/routes - configure.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 0.0 % 60 0
Test Date: 2025-03-12 16:10:49 Functions: 0.0 % 4 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use axum::extract::State;
       4              : use axum::response::Response;
       5              : use compute_api::requests::ConfigurationRequest;
       6              : use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
       7              : use http::StatusCode;
       8              : use tokio::task;
       9              : use tracing::info;
      10              : 
      11              : use crate::compute::{ComputeNode, ParsedSpec};
      12              : use crate::http::JsonResponse;
      13              : use crate::http::extract::Json;
      14              : 
      15              : // Accept spec in JSON format and request compute configuration. If anything
      16              : // goes wrong after we set the compute status to `ConfigurationPending` and
      17              : // update compute state with new spec, we basically leave compute in the
      18              : // potentially wrong state. That said, it's control-plane's responsibility to
      19              : // watch compute state after reconfiguration request and to clean restart in
      20              : // case of errors.
      21            0 : pub(in crate::http) async fn configure(
      22            0 :     State(compute): State<Arc<ComputeNode>>,
      23            0 :     request: Json<ConfigurationRequest>,
      24            0 : ) -> Response {
      25            0 :     if !compute.params.live_config_allowed {
      26            0 :         return JsonResponse::error(
      27            0 :             StatusCode::PRECONDITION_FAILED,
      28            0 :             "live configuration is not allowed for this compute node".to_string(),
      29            0 :         );
      30            0 :     }
      31              : 
      32            0 :     let pspec = match ParsedSpec::try_from(request.spec.clone()) {
      33            0 :         Ok(p) => p,
      34            0 :         Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
      35              :     };
      36              : 
      37              :     // XXX: wrap state update under lock in a code block. Otherwise, we will try
      38              :     // to `Send` `mut state` into the spawned thread bellow, which will cause
      39              :     // the following rustc error:
      40              :     //
      41              :     // error: future cannot be sent between threads safely
      42              :     {
      43            0 :         let mut state = compute.state.lock().unwrap();
      44            0 :         if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
      45            0 :             return JsonResponse::invalid_status(state.status);
      46            0 :         }
      47            0 : 
      48            0 :         // Pass the tracing span to the main thread that performs the startup,
      49            0 :         // so that the start_compute operation is considered a child of this
      50            0 :         // configure request for tracing purposes.
      51            0 :         state.startup_span = Some(tracing::Span::current());
      52            0 : 
      53            0 :         state.pspec = Some(pspec);
      54            0 :         state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
      55            0 :         drop(state);
      56            0 :     }
      57            0 : 
      58            0 :     // Spawn a blocking thread to wait for compute to become Running. This is
      59            0 :     // needed to not block the main pool of workers and to be able to serve
      60            0 :     // other requests while some particular request is waiting for compute to
      61            0 :     // finish configuration.
      62            0 :     let c = compute.clone();
      63            0 :     let completed = task::spawn_blocking(move || {
      64            0 :         let mut state = c.state.lock().unwrap();
      65            0 :         while state.status != ComputeStatus::Running {
      66            0 :             state = c.state_changed.wait(state).unwrap();
      67            0 :             info!(
      68            0 :                 "waiting for compute to become {}, current status: {}",
      69            0 :                 ComputeStatus::Running,
      70            0 :                 state.status
      71              :             );
      72              : 
      73            0 :             if state.status == ComputeStatus::Failed {
      74            0 :                 let err = state.error.as_ref().map_or("unknown error", |x| x);
      75            0 :                 let msg = format!("compute configuration failed: {:?}", err);
      76            0 :                 return Err(msg);
      77            0 :             }
      78              :         }
      79              : 
      80            0 :         Ok(())
      81            0 :     })
      82            0 :     .await
      83            0 :     .unwrap();
      84              : 
      85            0 :     if let Err(e) = completed {
      86            0 :         return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
      87            0 :     }
      88            0 : 
      89            0 :     // Return current compute state if everything went well.
      90            0 :     let state = compute.state.lock().unwrap().clone();
      91            0 :     let body = ComputeStatusResponse::from(&state);
      92            0 : 
      93            0 :     JsonResponse::success(StatusCode::OK, body)
      94            0 : }
        

Generated by: LCOV version 2.1-beta