Line data Source code
1 : use std::sync::Arc;
2 :
3 : use axum::{
4 : extract::State,
5 : response::{IntoResponse, Response},
6 : };
7 : use compute_api::responses::ComputeStatus;
8 : use http::StatusCode;
9 : use tokio::task;
10 : use tracing::info;
11 :
12 : use crate::{
13 : compute::{forward_termination_signal, ComputeNode},
14 : http::JsonResponse,
15 : };
16 :
17 : /// Terminate the compute.
18 0 : pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
19 0 : {
20 0 : let mut state = compute.state.lock().unwrap();
21 0 : if state.status == ComputeStatus::Terminated {
22 0 : return StatusCode::CREATED.into_response();
23 0 : }
24 :
25 0 : if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
26 0 : return JsonResponse::invalid_status(state.status);
27 0 : }
28 0 :
29 0 : state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
30 0 : drop(state);
31 0 : }
32 0 :
33 0 : forward_termination_signal();
34 0 : info!("sent signal and notified waiters");
35 :
36 : // Spawn a blocking thread to wait for compute to become Terminated.
37 : // This is needed to do not block the main pool of workers and
38 : // be able to serve other requests while some particular request
39 : // is waiting for compute to finish configuration.
40 0 : let c = compute.clone();
41 0 : task::spawn_blocking(move || {
42 0 : let mut state = c.state.lock().unwrap();
43 0 : while state.status != ComputeStatus::Terminated {
44 0 : state = c.state_changed.wait(state).unwrap();
45 0 : info!(
46 0 : "waiting for compute to become {}, current status: {:?}",
47 0 : ComputeStatus::Terminated,
48 0 : state.status
49 : );
50 : }
51 0 : })
52 0 : .await
53 0 : .unwrap();
54 0 :
55 0 : info!("terminated Postgres");
56 :
57 0 : StatusCode::OK.into_response()
58 0 : }
|