Line data Source code
1 : use std::sync::Arc;
2 :
3 : use axum::body::Body;
4 : use axum::extract::State;
5 : use axum::response::Response;
6 : use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
7 : use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
8 : use compute_api::spec::ComputeFeature;
9 : use http::StatusCode;
10 : use tokio::task;
11 : use tracing::info;
12 :
13 : use crate::compute::{ComputeNode, ParsedSpec};
14 : use crate::http::JsonResponse;
15 : use crate::http::extract::Json;
16 : use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
17 :
18 : // Accept spec in JSON format and request compute configuration. If anything
19 : // goes wrong after we set the compute status to `ConfigurationPending` and
20 : // update compute state with new spec, we basically leave compute in the
21 : // potentially wrong state. That said, it's control-plane's responsibility to
22 : // watch compute state after reconfiguration request and to clean restart in
23 : // case of errors.
24 0 : pub(in crate::http) async fn configure(
25 0 : State(compute): State<Arc<ComputeNode>>,
26 0 : request: Json<ConfigurationRequest>,
27 0 : ) -> Response {
28 0 : if !compute.params.live_config_allowed {
29 0 : return JsonResponse::error(
30 0 : StatusCode::PRECONDITION_FAILED,
31 0 : "live configuration is not allowed for this compute node".to_string(),
32 0 : );
33 0 : }
34 :
35 0 : let pspec = match ParsedSpec::try_from(request.spec.clone()) {
36 0 : Ok(p) => p,
37 0 : Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
38 : };
39 :
40 : // XXX: wrap state update under lock in a code block. Otherwise, we will try
41 : // to `Send` `mut state` into the spawned thread bellow, which will cause
42 : // the following rustc error:
43 : //
44 : // error: future cannot be sent between threads safely
45 : {
46 0 : let mut state = compute.state.lock().unwrap();
47 0 : if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
48 0 : return JsonResponse::invalid_status(state.status);
49 0 : }
50 0 :
51 0 : // Pass the tracing span to the main thread that performs the startup,
52 0 : // so that the start_compute operation is considered a child of this
53 0 : // configure request for tracing purposes.
54 0 : state.startup_span = Some(tracing::Span::current());
55 0 :
56 0 : state.pspec = Some(pspec);
57 0 : state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
58 0 : drop(state);
59 0 : }
60 0 :
61 0 : // Spawn a blocking thread to wait for compute to become Running. This is
62 0 : // needed to not block the main pool of workers and to be able to serve
63 0 : // other requests while some particular request is waiting for compute to
64 0 : // finish configuration.
65 0 : let c = compute.clone();
66 0 : let completed = task::spawn_blocking(move || {
67 0 : let mut state = c.state.lock().unwrap();
68 0 : while state.status != ComputeStatus::Running {
69 0 : state = c.state_changed.wait(state).unwrap();
70 0 : info!(
71 0 : "waiting for compute to become {}, current status: {}",
72 0 : ComputeStatus::Running,
73 0 : state.status
74 : );
75 :
76 0 : if state.status == ComputeStatus::Failed {
77 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
78 0 : let msg = format!("compute configuration failed: {:?}", err);
79 0 : return Err(msg);
80 0 : }
81 : }
82 :
83 0 : Ok(())
84 0 : })
85 0 : .await
86 0 : .unwrap();
87 :
88 0 : if let Err(e) = completed {
89 0 : return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
90 0 : }
91 0 :
92 0 : // Return current compute state if everything went well.
93 0 : let state = compute.state.lock().unwrap().clone();
94 0 : let body = ComputeStatusResponse::from(&state);
95 0 :
96 0 : JsonResponse::success(StatusCode::OK, body)
97 0 : }
98 :
99 0 : pub(in crate::http) async fn configure_telemetry(
100 0 : State(compute): State<Arc<ComputeNode>>,
101 0 : request: Json<ConfigureTelemetryRequest>,
102 0 : ) -> Response {
103 0 : if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
104 0 : return JsonResponse::error(
105 0 : StatusCode::PRECONDITION_FAILED,
106 0 : "Postgres logs export feature is not enabled".to_string(),
107 0 : );
108 0 : }
109 0 :
110 0 : let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
111 0 : if let Err(err) = configure_postgres_logs_export(conf) {
112 0 : return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
113 0 : }
114 0 :
115 0 : Response::builder()
116 0 : .status(StatusCode::NO_CONTENT)
117 0 : .body(Body::from(""))
118 0 : .unwrap()
119 0 : }
|