LCOV - code coverage report
Current view: top level - compute_tools/src/http/routes - configure.rs (source / functions) Coverage Total Hit
Test: ae4948feae6a1d420c855050eb8c189119446a71.info Lines: 0.0 % 81 0
Test Date: 2025-03-18 18:33:46 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use axum::body::Body;
       4              : use axum::extract::State;
       5              : use axum::response::Response;
       6              : use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
       7              : use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
       8              : use compute_api::spec::ComputeFeature;
       9              : use http::StatusCode;
      10              : use tokio::task;
      11              : use tracing::info;
      12              : 
      13              : use crate::compute::{ComputeNode, ParsedSpec};
      14              : use crate::http::JsonResponse;
      15              : use crate::http::extract::Json;
      16              : use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
      17              : 
      18              : // Accept spec in JSON format and request compute configuration. If anything
      19              : // goes wrong after we set the compute status to `ConfigurationPending` and
      20              : // update compute state with new spec, we basically leave compute in the
      21              : // potentially wrong state. That said, it's control-plane's responsibility to
      22              : // watch compute state after reconfiguration request and to clean restart in
      23              : // case of errors.
      24            0 : pub(in crate::http) async fn configure(
      25            0 :     State(compute): State<Arc<ComputeNode>>,
      26            0 :     request: Json<ConfigurationRequest>,
      27            0 : ) -> Response {
      28            0 :     if !compute.params.live_config_allowed {
      29            0 :         return JsonResponse::error(
      30            0 :             StatusCode::PRECONDITION_FAILED,
      31            0 :             "live configuration is not allowed for this compute node".to_string(),
      32            0 :         );
      33            0 :     }
      34              : 
      35            0 :     let pspec = match ParsedSpec::try_from(request.spec.clone()) {
      36            0 :         Ok(p) => p,
      37            0 :         Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
      38              :     };
      39              : 
      40              :     // XXX: wrap state update under lock in a code block. Otherwise, we will try
      41              :     // to `Send` `mut state` into the spawned thread bellow, which will cause
      42              :     // the following rustc error:
      43              :     //
      44              :     // error: future cannot be sent between threads safely
      45              :     {
      46            0 :         let mut state = compute.state.lock().unwrap();
      47            0 :         if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
      48            0 :             return JsonResponse::invalid_status(state.status);
      49            0 :         }
      50            0 : 
      51            0 :         // Pass the tracing span to the main thread that performs the startup,
      52            0 :         // so that the start_compute operation is considered a child of this
      53            0 :         // configure request for tracing purposes.
      54            0 :         state.startup_span = Some(tracing::Span::current());
      55            0 : 
      56            0 :         state.pspec = Some(pspec);
      57            0 :         state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
      58            0 :         drop(state);
      59            0 :     }
      60            0 : 
      61            0 :     // Spawn a blocking thread to wait for compute to become Running. This is
      62            0 :     // needed to not block the main pool of workers and to be able to serve
      63            0 :     // other requests while some particular request is waiting for compute to
      64            0 :     // finish configuration.
      65            0 :     let c = compute.clone();
      66            0 :     let completed = task::spawn_blocking(move || {
      67            0 :         let mut state = c.state.lock().unwrap();
      68            0 :         while state.status != ComputeStatus::Running {
      69            0 :             state = c.state_changed.wait(state).unwrap();
      70            0 :             info!(
      71            0 :                 "waiting for compute to become {}, current status: {}",
      72            0 :                 ComputeStatus::Running,
      73            0 :                 state.status
      74              :             );
      75              : 
      76            0 :             if state.status == ComputeStatus::Failed {
      77            0 :                 let err = state.error.as_ref().map_or("unknown error", |x| x);
      78            0 :                 let msg = format!("compute configuration failed: {:?}", err);
      79            0 :                 return Err(msg);
      80            0 :             }
      81              :         }
      82              : 
      83            0 :         Ok(())
      84            0 :     })
      85            0 :     .await
      86            0 :     .unwrap();
      87              : 
      88            0 :     if let Err(e) = completed {
      89            0 :         return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
      90            0 :     }
      91            0 : 
      92            0 :     // Return current compute state if everything went well.
      93            0 :     let state = compute.state.lock().unwrap().clone();
      94            0 :     let body = ComputeStatusResponse::from(&state);
      95            0 : 
      96            0 :     JsonResponse::success(StatusCode::OK, body)
      97            0 : }
      98              : 
      99            0 : pub(in crate::http) async fn configure_telemetry(
     100            0 :     State(compute): State<Arc<ComputeNode>>,
     101            0 :     request: Json<ConfigureTelemetryRequest>,
     102            0 : ) -> Response {
     103            0 :     if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
     104            0 :         return JsonResponse::error(
     105            0 :             StatusCode::PRECONDITION_FAILED,
     106            0 :             "Postgres logs export feature is not enabled".to_string(),
     107            0 :         );
     108            0 :     }
     109            0 : 
     110            0 :     let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
     111            0 :     if let Err(err) = configure_postgres_logs_export(conf) {
     112            0 :         return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
     113            0 :     }
     114            0 : 
     115            0 :     Response::builder()
     116            0 :         .status(StatusCode::NO_CONTENT)
     117            0 :         .body(Body::from(""))
     118            0 :         .unwrap()
     119            0 : }
        

Generated by: LCOV version 2.1-beta