Line data Source code
1 : use crate::compute::{ComputeNode, forward_termination_signal};
2 : use crate::http::JsonResponse;
3 : use axum::extract::State;
4 : use axum::response::Response;
5 : use axum_extra::extract::OptionalQuery;
6 : use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
7 : use http::StatusCode;
8 : use serde::Deserialize;
9 : use std::sync::Arc;
10 : use tokio::task;
11 : use tracing::info;
12 :
13 0 : #[derive(Deserialize, Default)]
14 : pub struct TerminateQuery {
15 : mode: TerminateMode,
16 : }
17 :
18 : /// Terminate the compute.
19 0 : pub(in crate::http) async fn terminate(
20 0 : State(compute): State<Arc<ComputeNode>>,
21 0 : OptionalQuery(terminate): OptionalQuery<TerminateQuery>,
22 0 : ) -> Response {
23 0 : let mode = terminate.unwrap_or_default().mode;
24 : {
25 0 : let mut state = compute.state.lock().unwrap();
26 0 : if state.status == ComputeStatus::Terminated {
27 0 : let response = TerminateResponse {
28 0 : lsn: state.terminate_flush_lsn,
29 0 : };
30 0 : return JsonResponse::success(StatusCode::CREATED, response);
31 0 : }
32 :
33 0 : if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
34 0 : return JsonResponse::invalid_status(state.status);
35 0 : }
36 0 : state.set_status(mode.into(), &compute.state_changed);
37 : }
38 :
39 0 : forward_termination_signal(false);
40 0 : info!("sent signal and notified waiters");
41 :
42 : // Spawn a blocking thread to wait for compute to become Terminated.
43 : // This is needed to do not block the main pool of workers and
44 : // be able to serve other requests while some particular request
45 : // is waiting for compute to finish configuration.
46 0 : let c = compute.clone();
47 0 : let lsn = task::spawn_blocking(move || {
48 0 : let mut state = c.state.lock().unwrap();
49 0 : while state.status != ComputeStatus::Terminated {
50 0 : state = c.state_changed.wait(state).unwrap();
51 0 : info!(
52 0 : "waiting for compute to become {}, current status: {:?}",
53 : ComputeStatus::Terminated,
54 0 : state.status
55 : );
56 : }
57 0 : state.terminate_flush_lsn
58 0 : })
59 0 : .await
60 0 : .unwrap();
61 0 : info!("terminated Postgres");
62 0 : JsonResponse::success(StatusCode::OK, TerminateResponse { lsn })
63 0 : }
|