Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::IpAddr;
3 : use std::net::Ipv6Addr;
4 : use std::net::SocketAddr;
5 : use std::sync::Arc;
6 : use std::thread;
7 :
8 : use crate::compute::forward_termination_signal;
9 : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
10 : use compute_api::requests::ConfigurationRequest;
11 : use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
12 :
13 : use anyhow::Result;
14 : use hyper::service::{make_service_fn, service_fn};
15 : use hyper::{Body, Method, Request, Response, Server, StatusCode};
16 : use tokio::task;
17 : use tracing::{error, info, warn};
18 : use tracing_utils::http::OtelName;
19 :
20 0 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
21 0 : ComputeStatusResponse {
22 0 : start_time: state.start_time,
23 0 : tenant: state
24 0 : .pspec
25 0 : .as_ref()
26 0 : .map(|pspec| pspec.tenant_id.to_string()),
27 0 : timeline: state
28 0 : .pspec
29 0 : .as_ref()
30 0 : .map(|pspec| pspec.timeline_id.to_string()),
31 0 : status: state.status,
32 0 : last_active: state.last_active,
33 0 : error: state.error.clone(),
34 0 : }
35 0 : }
36 :
37 : // Service function to handle all available routes.
38 0 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
39 0 : //
40 0 : // NOTE: The URI path is currently included in traces. That's OK because
41 0 : // it doesn't contain any variable parts or sensitive information. But
42 0 : // please keep that in mind if you change the routing here.
43 0 : //
44 0 : match (req.method(), req.uri().path()) {
45 : // Serialized compute state.
46 0 : (&Method::GET, "/status") => {
47 0 : info!("serving /status GET request");
48 0 : let state = compute.state.lock().unwrap();
49 0 : let status_response = status_response_from_state(&state);
50 0 : Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
51 : }
52 :
53 : // Startup metrics in JSON format. Keep /metrics reserved for a possible
54 : // future use for Prometheus metrics format.
55 0 : (&Method::GET, "/metrics.json") => {
56 0 : info!("serving /metrics.json GET request");
57 0 : let metrics = compute.state.lock().unwrap().metrics.clone();
58 0 : Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
59 : }
60 :
61 : // Collect Postgres current usage insights
62 0 : (&Method::GET, "/insights") => {
63 0 : info!("serving /insights GET request");
64 0 : let status = compute.get_status();
65 0 : if status != ComputeStatus::Running {
66 0 : let msg = format!("compute is not running, current status: {:?}", status);
67 0 : error!(msg);
68 0 : return Response::new(Body::from(msg));
69 0 : }
70 :
71 0 : let insights = compute.collect_insights().await;
72 0 : Response::new(Body::from(insights))
73 : }
74 :
75 0 : (&Method::POST, "/check_writability") => {
76 0 : info!("serving /check_writability POST request");
77 0 : let status = compute.get_status();
78 0 : if status != ComputeStatus::Running {
79 0 : let msg = format!(
80 0 : "invalid compute status for check_writability request: {:?}",
81 0 : status
82 0 : );
83 0 : error!(msg);
84 0 : return Response::new(Body::from(msg));
85 0 : }
86 :
87 0 : let res = crate::checker::check_writability(compute).await;
88 0 : match res {
89 0 : Ok(_) => Response::new(Body::from("true")),
90 0 : Err(e) => {
91 0 : error!("check_writability failed: {}", e);
92 0 : Response::new(Body::from(e.to_string()))
93 : }
94 : }
95 : }
96 :
97 0 : (&Method::GET, "/info") => {
98 0 : let num_cpus = num_cpus::get_physical();
99 0 : info!("serving /info GET request. num_cpus: {}", num_cpus);
100 0 : Response::new(Body::from(
101 0 : serde_json::json!({
102 0 : "num_cpus": num_cpus,
103 0 : })
104 0 : .to_string(),
105 0 : ))
106 : }
107 :
108 : // Accept spec in JSON format and request compute configuration. If
109 : // anything goes wrong after we set the compute status to `ConfigurationPending`
110 : // and update compute state with new spec, we basically leave compute
111 : // in the potentially wrong state. That said, it's control-plane's
112 : // responsibility to watch compute state after reconfiguration request
113 : // and to clean restart in case of errors.
114 0 : (&Method::POST, "/configure") => {
115 0 : info!("serving /configure POST request");
116 0 : match handle_configure_request(req, compute).await {
117 0 : Ok(msg) => Response::new(Body::from(msg)),
118 0 : Err((msg, code)) => {
119 0 : error!("error handling /configure request: {msg}");
120 0 : render_json_error(&msg, code)
121 : }
122 : }
123 : }
124 :
125 0 : (&Method::POST, "/terminate") => {
126 0 : info!("serving /terminate POST request");
127 0 : match handle_terminate_request(compute).await {
128 0 : Ok(()) => Response::new(Body::empty()),
129 0 : Err((msg, code)) => {
130 0 : error!("error handling /terminate request: {msg}");
131 0 : render_json_error(&msg, code)
132 : }
133 : }
134 : }
135 :
136 : // download extension files from remote extension storage on demand
137 0 : (&Method::POST, route) if route.starts_with("/extension_server/") => {
138 0 : info!("serving {:?} POST request", route);
139 0 : info!("req.uri {:?}", req.uri());
140 :
141 : // don't even try to download extensions
142 : // if no remote storage is configured
143 0 : if compute.ext_remote_storage.is_none() {
144 0 : info!("no extensions remote storage configured");
145 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
146 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
147 0 : return resp;
148 0 : }
149 0 :
150 0 : let mut is_library = false;
151 0 : if let Some(params) = req.uri().query() {
152 0 : info!("serving {:?} POST request with params: {}", route, params);
153 0 : if params == "is_library=true" {
154 0 : is_library = true;
155 0 : } else {
156 0 : let mut resp = Response::new(Body::from("Wrong request parameters"));
157 0 : *resp.status_mut() = StatusCode::BAD_REQUEST;
158 0 : return resp;
159 : }
160 0 : }
161 0 : let filename = route.split('/').last().unwrap().to_string();
162 0 : info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
163 :
164 : // get ext_name and path from spec
165 : // don't lock compute_state for too long
166 0 : let ext = {
167 0 : let compute_state = compute.state.lock().unwrap();
168 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
169 0 : let spec = &pspec.spec;
170 0 :
171 0 : // debug only
172 0 : info!("spec: {:?}", spec);
173 :
174 0 : let remote_extensions = match spec.remote_extensions.as_ref() {
175 0 : Some(r) => r,
176 : None => {
177 0 : info!("no remote extensions spec was provided");
178 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
179 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
180 0 : return resp;
181 : }
182 : };
183 :
184 0 : remote_extensions.get_ext(
185 0 : &filename,
186 0 : is_library,
187 0 : &compute.build_tag,
188 0 : &compute.pgversion,
189 0 : )
190 0 : };
191 0 :
192 0 : match ext {
193 0 : Ok((ext_name, ext_path)) => {
194 0 : match compute.download_extension(ext_name, ext_path).await {
195 0 : Ok(_) => Response::new(Body::from("OK")),
196 0 : Err(e) => {
197 0 : error!("extension download failed: {}", e);
198 0 : let mut resp = Response::new(Body::from(e.to_string()));
199 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
200 0 : resp
201 : }
202 : }
203 : }
204 0 : Err(e) => {
205 0 : warn!("extension download failed to find extension: {}", e);
206 0 : let mut resp = Response::new(Body::from("failed to find file"));
207 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
208 0 : resp
209 : }
210 : }
211 : }
212 :
213 : // Return the `404 Not Found` for any other routes.
214 : _ => {
215 0 : let mut not_found = Response::new(Body::from("404 Not Found"));
216 0 : *not_found.status_mut() = StatusCode::NOT_FOUND;
217 0 : not_found
218 : }
219 : }
220 0 : }
221 :
222 0 : async fn handle_configure_request(
223 0 : req: Request<Body>,
224 0 : compute: &Arc<ComputeNode>,
225 0 : ) -> Result<String, (String, StatusCode)> {
226 0 : if !compute.live_config_allowed {
227 0 : return Err((
228 0 : "live configuration is not allowed for this compute node".to_string(),
229 0 : StatusCode::PRECONDITION_FAILED,
230 0 : ));
231 0 : }
232 :
233 0 : let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
234 0 : let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
235 0 : if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
236 0 : let spec = request.spec;
237 :
238 0 : let parsed_spec = match ParsedSpec::try_from(spec) {
239 0 : Ok(ps) => ps,
240 0 : Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
241 : };
242 :
243 : // XXX: wrap state update under lock in code blocks. Otherwise,
244 : // we will try to `Send` `mut state` into the spawned thread
245 : // bellow, which will cause error:
246 : // ```
247 : // error: future cannot be sent between threads safely
248 : // ```
249 : {
250 0 : let mut state = compute.state.lock().unwrap();
251 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
252 0 : let msg = format!(
253 0 : "invalid compute status for configuration request: {:?}",
254 0 : state.status.clone()
255 0 : );
256 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
257 0 : }
258 0 : state.pspec = Some(parsed_spec);
259 0 : state.status = ComputeStatus::ConfigurationPending;
260 0 : compute.state_changed.notify_all();
261 0 : drop(state);
262 0 : info!("set new spec and notified waiters");
263 : }
264 :
265 : // Spawn a blocking thread to wait for compute to become Running.
266 : // This is needed to do not block the main pool of workers and
267 : // be able to serve other requests while some particular request
268 : // is waiting for compute to finish configuration.
269 0 : let c = compute.clone();
270 0 : task::spawn_blocking(move || {
271 0 : let mut state = c.state.lock().unwrap();
272 0 : while state.status != ComputeStatus::Running {
273 0 : state = c.state_changed.wait(state).unwrap();
274 0 : info!(
275 0 : "waiting for compute to become Running, current status: {:?}",
276 0 : state.status
277 : );
278 :
279 0 : if state.status == ComputeStatus::Failed {
280 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
281 0 : let msg = format!("compute configuration failed: {:?}", err);
282 0 : return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
283 0 : }
284 : }
285 :
286 0 : Ok(())
287 0 : })
288 0 : .await
289 0 : .unwrap()?;
290 :
291 : // Return current compute state if everything went well.
292 0 : let state = compute.state.lock().unwrap().clone();
293 0 : let status_response = status_response_from_state(&state);
294 0 : Ok(serde_json::to_string(&status_response).unwrap())
295 : } else {
296 0 : Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
297 : }
298 0 : }
299 :
300 0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
301 0 : let error = GenericAPIError {
302 0 : error: e.to_string(),
303 0 : };
304 0 : Response::builder()
305 0 : .status(status)
306 0 : .body(Body::from(serde_json::to_string(&error).unwrap()))
307 0 : .unwrap()
308 0 : }
309 :
310 0 : async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
311 0 : {
312 0 : let mut state = compute.state.lock().unwrap();
313 0 : if state.status == ComputeStatus::Terminated {
314 0 : return Ok(());
315 0 : }
316 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
317 0 : let msg = format!(
318 0 : "invalid compute status for termination request: {:?}",
319 0 : state.status.clone()
320 0 : );
321 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
322 0 : }
323 0 : state.status = ComputeStatus::TerminationPending;
324 0 : compute.state_changed.notify_all();
325 0 : drop(state);
326 0 : }
327 0 : forward_termination_signal();
328 0 : info!("sent signal and notified waiters");
329 :
330 : // Spawn a blocking thread to wait for compute to become Terminated.
331 : // This is needed to do not block the main pool of workers and
332 : // be able to serve other requests while some particular request
333 : // is waiting for compute to finish configuration.
334 0 : let c = compute.clone();
335 0 : task::spawn_blocking(move || {
336 0 : let mut state = c.state.lock().unwrap();
337 0 : while state.status != ComputeStatus::Terminated {
338 0 : state = c.state_changed.wait(state).unwrap();
339 0 : info!(
340 0 : "waiting for compute to become Terminated, current status: {:?}",
341 0 : state.status
342 : );
343 : }
344 :
345 0 : Ok(())
346 0 : })
347 0 : .await
348 0 : .unwrap()?;
349 0 : info!("terminated Postgres");
350 0 : Ok(())
351 0 : }
352 :
353 : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
354 : #[tokio::main]
355 0 : async fn serve(port: u16, state: Arc<ComputeNode>) {
356 0 : // this usually binds to both IPv4 and IPv6 on linux
357 0 : // see e.g. https://github.com/rust-lang/rust/pull/34440
358 0 : let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
359 0 :
360 0 : let make_service = make_service_fn(move |_conn| {
361 0 : let state = state.clone();
362 0 : async move {
363 0 : Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
364 0 : let state = state.clone();
365 0 : async move {
366 0 : Ok::<_, Infallible>(
367 0 : // NOTE: We include the URI path in the string. It
368 0 : // doesn't contain any variable parts or sensitive
369 0 : // information in this API.
370 0 : tracing_utils::http::tracing_handler(
371 0 : req,
372 0 : |req| routes(req, &state),
373 0 : OtelName::UriPath,
374 0 : )
375 0 : .await,
376 0 : )
377 0 : }
378 0 : }))
379 0 : }
380 0 : });
381 0 :
382 0 : info!("starting HTTP server on {}", addr);
383 0 :
384 0 : let server = Server::bind(&addr).serve(make_service);
385 0 :
386 0 : // Run this server forever
387 0 : if let Err(e) = server.await {
388 0 : error!("server error: {}", e);
389 0 : }
390 0 : }
391 :
392 : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
393 0 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
394 0 : let state = Arc::clone(state);
395 0 :
396 0 : Ok(thread::Builder::new()
397 0 : .name("http-endpoint".into())
398 0 : .spawn(move || serve(port, state))?)
399 0 : }
|