Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::IpAddr;
3 : use std::net::Ipv6Addr;
4 : use std::net::SocketAddr;
5 : use std::sync::Arc;
6 : use std::thread;
7 :
8 : use crate::compute::forward_termination_signal;
9 : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
10 : use compute_api::requests::ConfigurationRequest;
11 : use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
12 :
13 : use anyhow::Result;
14 : use hyper::service::{make_service_fn, service_fn};
15 : use hyper::{Body, Method, Request, Response, Server, StatusCode};
16 : use num_cpus;
17 : use serde_json;
18 : use tokio::task;
19 : use tracing::{error, info, warn};
20 : use tracing_utils::http::OtelName;
21 :
22 0 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
23 0 : ComputeStatusResponse {
24 0 : start_time: state.start_time,
25 0 : tenant: state
26 0 : .pspec
27 0 : .as_ref()
28 0 : .map(|pspec| pspec.tenant_id.to_string()),
29 0 : timeline: state
30 0 : .pspec
31 0 : .as_ref()
32 0 : .map(|pspec| pspec.timeline_id.to_string()),
33 0 : status: state.status,
34 0 : last_active: state.last_active,
35 0 : error: state.error.clone(),
36 0 : }
37 0 : }
38 :
39 : // Service function to handle all available routes.
40 0 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
41 0 : //
42 0 : // NOTE: The URI path is currently included in traces. That's OK because
43 0 : // it doesn't contain any variable parts or sensitive information. But
44 0 : // please keep that in mind if you change the routing here.
45 0 : //
46 0 : match (req.method(), req.uri().path()) {
47 : // Serialized compute state.
48 0 : (&Method::GET, "/status") => {
49 0 : info!("serving /status GET request");
50 0 : let state = compute.state.lock().unwrap();
51 0 : let status_response = status_response_from_state(&state);
52 0 : Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
53 : }
54 :
55 : // Startup metrics in JSON format. Keep /metrics reserved for a possible
56 : // future use for Prometheus metrics format.
57 0 : (&Method::GET, "/metrics.json") => {
58 0 : info!("serving /metrics.json GET request");
59 0 : let metrics = compute.state.lock().unwrap().metrics.clone();
60 0 : Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
61 : }
62 :
63 : // Collect Postgres current usage insights
64 0 : (&Method::GET, "/insights") => {
65 0 : info!("serving /insights GET request");
66 0 : let status = compute.get_status();
67 0 : if status != ComputeStatus::Running {
68 0 : let msg = format!("compute is not running, current status: {:?}", status);
69 0 : error!(msg);
70 0 : return Response::new(Body::from(msg));
71 0 : }
72 :
73 0 : let insights = compute.collect_insights().await;
74 0 : Response::new(Body::from(insights))
75 : }
76 :
77 0 : (&Method::POST, "/check_writability") => {
78 0 : info!("serving /check_writability POST request");
79 0 : let status = compute.get_status();
80 0 : if status != ComputeStatus::Running {
81 0 : let msg = format!(
82 0 : "invalid compute status for check_writability request: {:?}",
83 0 : status
84 0 : );
85 0 : error!(msg);
86 0 : return Response::new(Body::from(msg));
87 0 : }
88 :
89 0 : let res = crate::checker::check_writability(compute).await;
90 0 : match res {
91 0 : Ok(_) => Response::new(Body::from("true")),
92 0 : Err(e) => {
93 0 : error!("check_writability failed: {}", e);
94 0 : Response::new(Body::from(e.to_string()))
95 : }
96 : }
97 : }
98 :
99 0 : (&Method::GET, "/info") => {
100 0 : let num_cpus = num_cpus::get_physical();
101 0 : info!("serving /info GET request. num_cpus: {}", num_cpus);
102 0 : Response::new(Body::from(
103 0 : serde_json::json!({
104 0 : "num_cpus": num_cpus,
105 0 : })
106 0 : .to_string(),
107 0 : ))
108 : }
109 :
110 : // Accept spec in JSON format and request compute configuration. If
111 : // anything goes wrong after we set the compute status to `ConfigurationPending`
112 : // and update compute state with new spec, we basically leave compute
113 : // in the potentially wrong state. That said, it's control-plane's
114 : // responsibility to watch compute state after reconfiguration request
115 : // and to clean restart in case of errors.
116 0 : (&Method::POST, "/configure") => {
117 0 : info!("serving /configure POST request");
118 0 : match handle_configure_request(req, compute).await {
119 0 : Ok(msg) => Response::new(Body::from(msg)),
120 0 : Err((msg, code)) => {
121 0 : error!("error handling /configure request: {msg}");
122 0 : render_json_error(&msg, code)
123 : }
124 : }
125 : }
126 :
127 0 : (&Method::POST, "/terminate") => {
128 0 : info!("serving /terminate POST request");
129 0 : match handle_terminate_request(compute).await {
130 0 : Ok(()) => Response::new(Body::empty()),
131 0 : Err((msg, code)) => {
132 0 : error!("error handling /terminate request: {msg}");
133 0 : render_json_error(&msg, code)
134 : }
135 : }
136 : }
137 :
138 : // download extension files from remote extension storage on demand
139 0 : (&Method::POST, route) if route.starts_with("/extension_server/") => {
140 0 : info!("serving {:?} POST request", route);
141 0 : info!("req.uri {:?}", req.uri());
142 :
143 : // don't even try to download extensions
144 : // if no remote storage is configured
145 0 : if compute.ext_remote_storage.is_none() {
146 0 : info!("no extensions remote storage configured");
147 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
148 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
149 0 : return resp;
150 0 : }
151 0 :
152 0 : let mut is_library = false;
153 0 : if let Some(params) = req.uri().query() {
154 0 : info!("serving {:?} POST request with params: {}", route, params);
155 0 : if params == "is_library=true" {
156 0 : is_library = true;
157 0 : } else {
158 0 : let mut resp = Response::new(Body::from("Wrong request parameters"));
159 0 : *resp.status_mut() = StatusCode::BAD_REQUEST;
160 0 : return resp;
161 : }
162 0 : }
163 0 : let filename = route.split('/').last().unwrap().to_string();
164 0 : info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
165 :
166 : // get ext_name and path from spec
167 : // don't lock compute_state for too long
168 0 : let ext = {
169 0 : let compute_state = compute.state.lock().unwrap();
170 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
171 0 : let spec = &pspec.spec;
172 :
173 : // debug only
174 0 : info!("spec: {:?}", spec);
175 :
176 0 : let remote_extensions = match spec.remote_extensions.as_ref() {
177 0 : Some(r) => r,
178 : None => {
179 0 : info!("no remote extensions spec was provided");
180 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
181 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
182 0 : return resp;
183 : }
184 : };
185 :
186 0 : remote_extensions.get_ext(
187 0 : &filename,
188 0 : is_library,
189 0 : &compute.build_tag,
190 0 : &compute.pgversion,
191 0 : )
192 0 : };
193 0 :
194 0 : match ext {
195 0 : Ok((ext_name, ext_path)) => {
196 0 : match compute.download_extension(ext_name, ext_path).await {
197 0 : Ok(_) => Response::new(Body::from("OK")),
198 0 : Err(e) => {
199 0 : error!("extension download failed: {}", e);
200 0 : let mut resp = Response::new(Body::from(e.to_string()));
201 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
202 0 : resp
203 : }
204 : }
205 : }
206 0 : Err(e) => {
207 0 : warn!("extension download failed to find extension: {}", e);
208 0 : let mut resp = Response::new(Body::from("failed to find file"));
209 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
210 0 : resp
211 : }
212 : }
213 : }
214 :
215 : // Return the `404 Not Found` for any other routes.
216 : _ => {
217 0 : let mut not_found = Response::new(Body::from("404 Not Found"));
218 0 : *not_found.status_mut() = StatusCode::NOT_FOUND;
219 0 : not_found
220 : }
221 : }
222 0 : }
223 :
224 0 : async fn handle_configure_request(
225 0 : req: Request<Body>,
226 0 : compute: &Arc<ComputeNode>,
227 0 : ) -> Result<String, (String, StatusCode)> {
228 0 : if !compute.live_config_allowed {
229 0 : return Err((
230 0 : "live configuration is not allowed for this compute node".to_string(),
231 0 : StatusCode::PRECONDITION_FAILED,
232 0 : ));
233 0 : }
234 :
235 0 : let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
236 0 : let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
237 0 : if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
238 0 : let spec = request.spec;
239 :
240 0 : let parsed_spec = match ParsedSpec::try_from(spec) {
241 0 : Ok(ps) => ps,
242 0 : Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
243 : };
244 :
245 : // XXX: wrap state update under lock in code blocks. Otherwise,
246 : // we will try to `Send` `mut state` into the spawned thread
247 : // bellow, which will cause error:
248 : // ```
249 : // error: future cannot be sent between threads safely
250 : // ```
251 : {
252 0 : let mut state = compute.state.lock().unwrap();
253 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
254 0 : let msg = format!(
255 0 : "invalid compute status for configuration request: {:?}",
256 0 : state.status.clone()
257 0 : );
258 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
259 0 : }
260 0 : state.pspec = Some(parsed_spec);
261 0 : state.status = ComputeStatus::ConfigurationPending;
262 0 : compute.state_changed.notify_all();
263 0 : drop(state);
264 0 : info!("set new spec and notified waiters");
265 : }
266 :
267 : // Spawn a blocking thread to wait for compute to become Running.
268 : // This is needed to do not block the main pool of workers and
269 : // be able to serve other requests while some particular request
270 : // is waiting for compute to finish configuration.
271 0 : let c = compute.clone();
272 0 : task::spawn_blocking(move || {
273 0 : let mut state = c.state.lock().unwrap();
274 0 : while state.status != ComputeStatus::Running {
275 0 : state = c.state_changed.wait(state).unwrap();
276 0 : info!(
277 0 : "waiting for compute to become Running, current status: {:?}",
278 0 : state.status
279 0 : );
280 :
281 0 : if state.status == ComputeStatus::Failed {
282 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
283 0 : let msg = format!("compute configuration failed: {:?}", err);
284 0 : return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
285 0 : }
286 : }
287 :
288 0 : Ok(())
289 0 : })
290 0 : .await
291 0 : .unwrap()?;
292 :
293 : // Return current compute state if everything went well.
294 0 : let state = compute.state.lock().unwrap().clone();
295 0 : let status_response = status_response_from_state(&state);
296 0 : Ok(serde_json::to_string(&status_response).unwrap())
297 : } else {
298 0 : Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
299 : }
300 0 : }
301 :
302 0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
303 0 : let error = GenericAPIError {
304 0 : error: e.to_string(),
305 0 : };
306 0 : Response::builder()
307 0 : .status(status)
308 0 : .body(Body::from(serde_json::to_string(&error).unwrap()))
309 0 : .unwrap()
310 0 : }
311 :
312 0 : async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
313 0 : {
314 0 : let mut state = compute.state.lock().unwrap();
315 0 : if state.status == ComputeStatus::Terminated {
316 0 : return Ok(());
317 0 : }
318 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
319 0 : let msg = format!(
320 0 : "invalid compute status for termination request: {:?}",
321 0 : state.status.clone()
322 0 : );
323 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
324 0 : }
325 0 : state.status = ComputeStatus::TerminationPending;
326 0 : compute.state_changed.notify_all();
327 0 : drop(state);
328 0 : }
329 0 : forward_termination_signal();
330 0 : info!("sent signal and notified waiters");
331 :
332 : // Spawn a blocking thread to wait for compute to become Terminated.
333 : // This is needed to do not block the main pool of workers and
334 : // be able to serve other requests while some particular request
335 : // is waiting for compute to finish configuration.
336 0 : let c = compute.clone();
337 0 : task::spawn_blocking(move || {
338 0 : let mut state = c.state.lock().unwrap();
339 0 : while state.status != ComputeStatus::Terminated {
340 0 : state = c.state_changed.wait(state).unwrap();
341 0 : info!(
342 0 : "waiting for compute to become Terminated, current status: {:?}",
343 0 : state.status
344 0 : );
345 : }
346 :
347 0 : Ok(())
348 0 : })
349 0 : .await
350 0 : .unwrap()?;
351 0 : info!("terminated Postgres");
352 0 : Ok(())
353 0 : }
354 :
355 : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
356 : #[tokio::main]
357 0 : async fn serve(port: u16, state: Arc<ComputeNode>) {
358 0 : // this usually binds to both IPv4 and IPv6 on linux
359 0 : // see e.g. https://github.com/rust-lang/rust/pull/34440
360 0 : let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
361 0 :
362 0 : let make_service = make_service_fn(move |_conn| {
363 0 : let state = state.clone();
364 0 : async move {
365 0 : Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
366 0 : let state = state.clone();
367 0 : async move {
368 0 : Ok::<_, Infallible>(
369 0 : // NOTE: We include the URI path in the string. It
370 0 : // doesn't contain any variable parts or sensitive
371 0 : // information in this API.
372 0 : tracing_utils::http::tracing_handler(
373 0 : req,
374 0 : |req| routes(req, &state),
375 0 : OtelName::UriPath,
376 0 : )
377 0 : .await,
378 0 : )
379 0 : }
380 0 : }))
381 0 : }
382 0 : });
383 0 :
384 0 : info!("starting HTTP server on {}", addr);
385 0 :
386 0 : let server = Server::bind(&addr).serve(make_service);
387 0 :
388 0 : // Run this server forever
389 0 : if let Err(e) = server.await {
390 0 : error!("server error: {}", e);
391 0 : }
392 0 : }
393 :
394 : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
395 0 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
396 0 : let state = Arc::clone(state);
397 0 :
398 0 : Ok(thread::Builder::new()
399 0 : .name("http-endpoint".into())
400 0 : .spawn(move || serve(port, state))?)
401 0 : }
|