Line data Source code
1 : use std::sync::Arc;
2 :
3 : use axum::{extract::State, response::Response};
4 : use compute_api::{
5 : requests::ConfigurationRequest,
6 : responses::{ComputeStatus, ComputeStatusResponse},
7 : };
8 : use http::StatusCode;
9 : use tokio::task;
10 : use tracing::info;
11 :
12 : use crate::{
13 : compute::{ComputeNode, ParsedSpec},
14 : http::{extract::Json, JsonResponse},
15 : };
16 :
17 : // Accept spec in JSON format and request compute configuration. If anything
18 : // goes wrong after we set the compute status to `ConfigurationPending` and
19 : // update compute state with new spec, we basically leave compute in the
20 : // potentially wrong state. That said, it's control-plane's responsibility to
21 : // watch compute state after reconfiguration request and to clean restart in
22 : // case of errors.
23 0 : pub(in crate::http) async fn configure(
24 0 : State(compute): State<Arc<ComputeNode>>,
25 0 : request: Json<ConfigurationRequest>,
26 0 : ) -> Response {
27 0 : if !compute.live_config_allowed {
28 0 : return JsonResponse::error(
29 0 : StatusCode::PRECONDITION_FAILED,
30 0 : "live configuration is not allowed for this compute node".to_string(),
31 0 : );
32 0 : }
33 :
34 0 : let pspec = match ParsedSpec::try_from(request.spec.clone()) {
35 0 : Ok(p) => p,
36 0 : Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
37 : };
38 :
39 : // XXX: wrap state update under lock in a code block. Otherwise, we will try
40 : // to `Send` `mut state` into the spawned thread bellow, which will cause
41 : // the following rustc error:
42 : //
43 : // error: future cannot be sent between threads safely
44 : {
45 0 : let mut state = compute.state.lock().unwrap();
46 0 : if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
47 0 : return JsonResponse::invalid_status(state.status);
48 0 : }
49 0 :
50 0 : state.pspec = Some(pspec);
51 0 : state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
52 0 : drop(state);
53 0 : }
54 0 :
55 0 : // Spawn a blocking thread to wait for compute to become Running. This is
56 0 : // needed to do not block the main pool of workers and be able to serve
57 0 : // other requests while some particular request is waiting for compute to
58 0 : // finish configuration.
59 0 : let c = compute.clone();
60 0 : let completed = task::spawn_blocking(move || {
61 0 : let mut state = c.state.lock().unwrap();
62 0 : while state.status != ComputeStatus::Running {
63 0 : state = c.state_changed.wait(state).unwrap();
64 0 : info!(
65 0 : "waiting for compute to become {}, current status: {}",
66 0 : ComputeStatus::Running,
67 0 : state.status
68 : );
69 :
70 0 : if state.status == ComputeStatus::Failed {
71 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
72 0 : let msg = format!("compute configuration failed: {:?}", err);
73 0 : return Err(msg);
74 0 : }
75 : }
76 :
77 0 : Ok(())
78 0 : })
79 0 : .await
80 0 : .unwrap();
81 :
82 0 : if let Err(e) = completed {
83 0 : return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
84 0 : }
85 0 :
86 0 : // Return current compute state if everything went well.
87 0 : let state = compute.state.lock().unwrap().clone();
88 0 : let body = ComputeStatusResponse::from(&state);
89 0 :
90 0 : JsonResponse::success(StatusCode::OK, body)
91 0 : }
|