Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::IpAddr;
3 : use std::net::Ipv6Addr;
4 : use std::net::SocketAddr;
5 : use std::sync::Arc;
6 : use std::thread;
7 :
8 : use crate::catalog::SchemaDumpError;
9 : use crate::catalog::{get_database_schema, get_dbs_and_roles};
10 : use crate::compute::forward_termination_signal;
11 : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
12 : use crate::installed_extensions;
13 : use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
14 : use compute_api::responses::{
15 : ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
16 : SetRoleGrantsResponse,
17 : };
18 :
19 : use anyhow::Result;
20 : use hyper::header::CONTENT_TYPE;
21 : use hyper::service::{make_service_fn, service_fn};
22 : use hyper::{Body, Method, Request, Response, Server, StatusCode};
23 : use metrics::proto::MetricFamily;
24 : use metrics::Encoder;
25 : use metrics::TextEncoder;
26 : use tokio::task;
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::{debug, error, info, warn};
29 : use tracing_utils::http::OtelName;
30 : use utils::failpoint_support::failpoints_handler;
31 : use utils::http::error::ApiError;
32 : use utils::http::request::must_get_query_param;
33 :
34 0 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
35 0 : ComputeStatusResponse {
36 0 : start_time: state.start_time,
37 0 : tenant: state
38 0 : .pspec
39 0 : .as_ref()
40 0 : .map(|pspec| pspec.tenant_id.to_string()),
41 0 : timeline: state
42 0 : .pspec
43 0 : .as_ref()
44 0 : .map(|pspec| pspec.timeline_id.to_string()),
45 0 : status: state.status,
46 0 : last_active: state.last_active,
47 0 : error: state.error.clone(),
48 0 : }
49 0 : }
50 :
51 : // Service function to handle all available routes.
52 0 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
53 0 : //
54 0 : // NOTE: The URI path is currently included in traces. That's OK because
55 0 : // it doesn't contain any variable parts or sensitive information. But
56 0 : // please keep that in mind if you change the routing here.
57 0 : //
58 0 : match (req.method(), req.uri().path()) {
59 : // Serialized compute state.
60 0 : (&Method::GET, "/status") => {
61 0 : debug!("serving /status GET request");
62 0 : let state = compute.state.lock().unwrap();
63 0 : let status_response = status_response_from_state(&state);
64 0 : Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
65 : }
66 :
67 : // Startup metrics in JSON format. Keep /metrics reserved for a possible
68 : // future use for Prometheus metrics format.
69 0 : (&Method::GET, "/metrics.json") => {
70 0 : info!("serving /metrics.json GET request");
71 0 : let metrics = compute.state.lock().unwrap().metrics.clone();
72 0 : Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
73 : }
74 :
75 : // Prometheus metrics
76 0 : (&Method::GET, "/metrics") => {
77 0 : debug!("serving /metrics GET request");
78 :
79 : // When we call TextEncoder::encode() below, it will immediately
80 : // return an error if a metric family has no metrics, so we need to
81 : // preemptively filter out metric families with no metrics.
82 0 : let metrics = installed_extensions::collect()
83 0 : .into_iter()
84 0 : .filter(|m| !m.get_metric().is_empty())
85 0 : .collect::<Vec<MetricFamily>>();
86 0 :
87 0 : let encoder = TextEncoder::new();
88 0 : let mut buffer = vec![];
89 :
90 0 : if let Err(err) = encoder.encode(&metrics, &mut buffer) {
91 0 : let msg = format!("error handling /metrics request: {err}");
92 0 : error!(msg);
93 0 : return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR);
94 0 : }
95 0 :
96 0 : match Response::builder()
97 0 : .status(StatusCode::OK)
98 0 : .header(CONTENT_TYPE, encoder.format_type())
99 0 : .body(Body::from(buffer))
100 : {
101 0 : Ok(response) => response,
102 0 : Err(err) => {
103 0 : let msg = format!("error handling /metrics request: {err}");
104 0 : error!(msg);
105 0 : render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
106 : }
107 : }
108 : }
109 : // Collect Postgres current usage insights
110 0 : (&Method::GET, "/insights") => {
111 0 : info!("serving /insights GET request");
112 0 : let status = compute.get_status();
113 0 : if status != ComputeStatus::Running {
114 0 : let msg = format!("compute is not running, current status: {:?}", status);
115 0 : error!(msg);
116 0 : return Response::new(Body::from(msg));
117 0 : }
118 :
119 0 : let insights = compute.collect_insights().await;
120 0 : Response::new(Body::from(insights))
121 : }
122 :
123 0 : (&Method::POST, "/check_writability") => {
124 0 : info!("serving /check_writability POST request");
125 0 : let status = compute.get_status();
126 0 : if status != ComputeStatus::Running {
127 0 : let msg = format!(
128 0 : "invalid compute status for check_writability request: {:?}",
129 0 : status
130 0 : );
131 0 : error!(msg);
132 0 : return Response::new(Body::from(msg));
133 0 : }
134 :
135 0 : let res = crate::checker::check_writability(compute).await;
136 0 : match res {
137 0 : Ok(_) => Response::new(Body::from("true")),
138 0 : Err(e) => {
139 0 : error!("check_writability failed: {}", e);
140 0 : Response::new(Body::from(e.to_string()))
141 : }
142 : }
143 : }
144 :
145 0 : (&Method::POST, "/extensions") => {
146 0 : info!("serving /extensions POST request");
147 0 : let status = compute.get_status();
148 0 : if status != ComputeStatus::Running {
149 0 : let msg = format!(
150 0 : "invalid compute status for extensions request: {:?}",
151 0 : status
152 0 : );
153 0 : error!(msg);
154 0 : return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
155 0 : }
156 :
157 0 : let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
158 0 : let request = serde_json::from_slice::<ExtensionInstallRequest>(&request).unwrap();
159 0 : let res = compute
160 0 : .install_extension(&request.extension, &request.database, request.version)
161 0 : .await;
162 0 : match res {
163 0 : Ok(version) => render_json(Body::from(
164 0 : serde_json::to_string(&ExtensionInstallResult {
165 0 : extension: request.extension,
166 0 : version,
167 0 : })
168 0 : .unwrap(),
169 0 : )),
170 0 : Err(e) => {
171 0 : error!("install_extension failed: {}", e);
172 0 : render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
173 : }
174 : }
175 : }
176 :
177 0 : (&Method::GET, "/info") => {
178 0 : let num_cpus = num_cpus::get_physical();
179 0 : info!("serving /info GET request. num_cpus: {}", num_cpus);
180 0 : Response::new(Body::from(
181 0 : serde_json::json!({
182 0 : "num_cpus": num_cpus,
183 0 : })
184 0 : .to_string(),
185 0 : ))
186 : }
187 :
188 : // Accept spec in JSON format and request compute configuration. If
189 : // anything goes wrong after we set the compute status to `ConfigurationPending`
190 : // and update compute state with new spec, we basically leave compute
191 : // in the potentially wrong state. That said, it's control-plane's
192 : // responsibility to watch compute state after reconfiguration request
193 : // and to clean restart in case of errors.
194 0 : (&Method::POST, "/configure") => {
195 0 : info!("serving /configure POST request");
196 0 : match handle_configure_request(req, compute).await {
197 0 : Ok(msg) => Response::new(Body::from(msg)),
198 0 : Err((msg, code)) => {
199 0 : error!("error handling /configure request: {msg}");
200 0 : render_json_error(&msg, code)
201 : }
202 : }
203 : }
204 :
205 0 : (&Method::POST, "/terminate") => {
206 0 : info!("serving /terminate POST request");
207 0 : match handle_terminate_request(compute).await {
208 0 : Ok(()) => Response::new(Body::empty()),
209 0 : Err((msg, code)) => {
210 0 : error!("error handling /terminate request: {msg}");
211 0 : render_json_error(&msg, code)
212 : }
213 : }
214 : }
215 :
216 0 : (&Method::GET, "/dbs_and_roles") => {
217 0 : info!("serving /dbs_and_roles GET request",);
218 0 : match get_dbs_and_roles(compute).await {
219 0 : Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
220 : Err(_) => {
221 0 : render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR)
222 : }
223 : }
224 : }
225 :
226 0 : (&Method::GET, "/database_schema") => {
227 0 : let database = match must_get_query_param(&req, "database") {
228 0 : Err(e) => return e.into_response(),
229 0 : Ok(database) => database,
230 0 : };
231 0 : info!("serving /database_schema GET request with database: {database}",);
232 0 : match get_database_schema(compute, &database).await {
233 0 : Ok(res) => render_plain(Body::wrap_stream(res)),
234 : Err(SchemaDumpError::DatabaseDoesNotExist) => {
235 0 : render_json_error("database does not exist", StatusCode::NOT_FOUND)
236 : }
237 0 : Err(e) => {
238 0 : error!("can't get schema dump: {}", e);
239 0 : render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR)
240 : }
241 : }
242 : }
243 :
244 0 : (&Method::POST, "/grants") => {
245 0 : info!("serving /grants POST request");
246 0 : let status = compute.get_status();
247 0 : if status != ComputeStatus::Running {
248 0 : let msg = format!(
249 0 : "invalid compute status for set_role_grants request: {:?}",
250 0 : status
251 0 : );
252 0 : error!(msg);
253 0 : return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
254 0 : }
255 :
256 0 : let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
257 0 : let request = serde_json::from_slice::<SetRoleGrantsRequest>(&request).unwrap();
258 :
259 0 : let res = compute
260 0 : .set_role_grants(
261 0 : &request.database,
262 0 : &request.schema,
263 0 : &request.privileges,
264 0 : &request.role,
265 0 : )
266 0 : .await;
267 0 : match res {
268 0 : Ok(()) => render_json(Body::from(
269 0 : serde_json::to_string(&SetRoleGrantsResponse {
270 0 : database: request.database,
271 0 : schema: request.schema,
272 0 : role: request.role,
273 0 : privileges: request.privileges,
274 0 : })
275 0 : .unwrap(),
276 0 : )),
277 0 : Err(e) => render_json_error(
278 0 : &format!("could not grant role privileges to the schema: {e}"),
279 0 : // TODO: can we filter on role/schema not found errors
280 0 : // and return appropriate error code?
281 0 : StatusCode::INTERNAL_SERVER_ERROR,
282 0 : ),
283 : }
284 : }
285 :
286 : // get the list of installed extensions
287 : // currently only used in python tests
288 : // TODO: call it from cplane
289 0 : (&Method::GET, "/installed_extensions") => {
290 0 : info!("serving /installed_extensions GET request");
291 0 : let status = compute.get_status();
292 0 : if status != ComputeStatus::Running {
293 0 : let msg = format!(
294 0 : "invalid compute status for extensions request: {:?}",
295 0 : status
296 0 : );
297 0 : error!(msg);
298 0 : return Response::new(Body::from(msg));
299 0 : }
300 0 :
301 0 : let conf = compute.get_conn_conf(None);
302 0 : let res =
303 0 : task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
304 0 : .await
305 0 : .unwrap();
306 0 :
307 0 : match res {
308 0 : Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
309 0 : Err(e) => render_json_error(
310 0 : &format!("could not get list of installed extensions: {}", e),
311 0 : StatusCode::INTERNAL_SERVER_ERROR,
312 0 : ),
313 : }
314 : }
315 :
316 0 : (&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
317 0 : match failpoints_handler(req, CancellationToken::new()).await {
318 0 : Ok(r) => r,
319 0 : Err(ApiError::BadRequest(e)) => {
320 0 : render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
321 : }
322 : Err(_) => {
323 0 : render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
324 : }
325 : }
326 : }
327 :
328 : // download extension files from remote extension storage on demand
329 0 : (&Method::POST, route) if route.starts_with("/extension_server/") => {
330 0 : info!("serving {:?} POST request", route);
331 0 : info!("req.uri {:?}", req.uri());
332 :
333 : // don't even try to download extensions
334 : // if no remote storage is configured
335 0 : if compute.ext_remote_storage.is_none() {
336 0 : info!("no extensions remote storage configured");
337 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
338 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
339 0 : return resp;
340 0 : }
341 0 :
342 0 : let mut is_library = false;
343 0 : if let Some(params) = req.uri().query() {
344 0 : info!("serving {:?} POST request with params: {}", route, params);
345 0 : if params == "is_library=true" {
346 0 : is_library = true;
347 0 : } else {
348 0 : let mut resp = Response::new(Body::from("Wrong request parameters"));
349 0 : *resp.status_mut() = StatusCode::BAD_REQUEST;
350 0 : return resp;
351 : }
352 0 : }
353 0 : let filename = route.split('/').last().unwrap().to_string();
354 0 : info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
355 :
356 : // get ext_name and path from spec
357 : // don't lock compute_state for too long
358 0 : let ext = {
359 0 : let compute_state = compute.state.lock().unwrap();
360 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
361 0 : let spec = &pspec.spec;
362 0 :
363 0 : // debug only
364 0 : info!("spec: {:?}", spec);
365 :
366 0 : let remote_extensions = match spec.remote_extensions.as_ref() {
367 0 : Some(r) => r,
368 : None => {
369 0 : info!("no remote extensions spec was provided");
370 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
371 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
372 0 : return resp;
373 : }
374 : };
375 :
376 0 : remote_extensions.get_ext(
377 0 : &filename,
378 0 : is_library,
379 0 : &compute.build_tag,
380 0 : &compute.pgversion,
381 0 : )
382 0 : };
383 0 :
384 0 : match ext {
385 0 : Ok((ext_name, ext_path)) => {
386 0 : match compute.download_extension(ext_name, ext_path).await {
387 0 : Ok(_) => Response::new(Body::from("OK")),
388 0 : Err(e) => {
389 0 : error!("extension download failed: {}", e);
390 0 : let mut resp = Response::new(Body::from(e.to_string()));
391 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
392 0 : resp
393 : }
394 : }
395 : }
396 0 : Err(e) => {
397 0 : warn!("extension download failed to find extension: {}", e);
398 0 : let mut resp = Response::new(Body::from("failed to find file"));
399 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
400 0 : resp
401 : }
402 : }
403 : }
404 :
405 : // Return the `404 Not Found` for any other routes.
406 : _ => {
407 0 : let mut not_found = Response::new(Body::from("404 Not Found"));
408 0 : *not_found.status_mut() = StatusCode::NOT_FOUND;
409 0 : not_found
410 : }
411 : }
412 0 : }
413 :
414 0 : async fn handle_configure_request(
415 0 : req: Request<Body>,
416 0 : compute: &Arc<ComputeNode>,
417 0 : ) -> Result<String, (String, StatusCode)> {
418 0 : if !compute.live_config_allowed {
419 0 : return Err((
420 0 : "live configuration is not allowed for this compute node".to_string(),
421 0 : StatusCode::PRECONDITION_FAILED,
422 0 : ));
423 0 : }
424 :
425 0 : let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
426 0 : let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
427 0 : if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
428 0 : let spec = request.spec;
429 :
430 0 : let parsed_spec = match ParsedSpec::try_from(spec) {
431 0 : Ok(ps) => ps,
432 0 : Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
433 : };
434 :
435 : // XXX: wrap state update under lock in code blocks. Otherwise,
436 : // we will try to `Send` `mut state` into the spawned thread
437 : // bellow, which will cause error:
438 : // ```
439 : // error: future cannot be sent between threads safely
440 : // ```
441 : {
442 0 : let mut state = compute.state.lock().unwrap();
443 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
444 0 : let msg = format!(
445 0 : "invalid compute status for configuration request: {:?}",
446 0 : state.status.clone()
447 0 : );
448 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
449 0 : }
450 0 : state.pspec = Some(parsed_spec);
451 0 : state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
452 0 : drop(state);
453 0 : info!("set new spec and notified waiters");
454 : }
455 :
456 : // Spawn a blocking thread to wait for compute to become Running.
457 : // This is needed to do not block the main pool of workers and
458 : // be able to serve other requests while some particular request
459 : // is waiting for compute to finish configuration.
460 0 : let c = compute.clone();
461 0 : task::spawn_blocking(move || {
462 0 : let mut state = c.state.lock().unwrap();
463 0 : while state.status != ComputeStatus::Running {
464 0 : state = c.state_changed.wait(state).unwrap();
465 0 : info!(
466 0 : "waiting for compute to become Running, current status: {:?}",
467 0 : state.status
468 : );
469 :
470 0 : if state.status == ComputeStatus::Failed {
471 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
472 0 : let msg = format!("compute configuration failed: {:?}", err);
473 0 : return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
474 0 : }
475 : }
476 :
477 0 : Ok(())
478 0 : })
479 0 : .await
480 0 : .unwrap()?;
481 :
482 : // Return current compute state if everything went well.
483 0 : let state = compute.state.lock().unwrap().clone();
484 0 : let status_response = status_response_from_state(&state);
485 0 : Ok(serde_json::to_string(&status_response).unwrap())
486 : } else {
487 0 : Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
488 : }
489 0 : }
490 :
491 0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
492 0 : let error = GenericAPIError {
493 0 : error: e.to_string(),
494 0 : };
495 0 : Response::builder()
496 0 : .status(status)
497 0 : .header(CONTENT_TYPE, "application/json")
498 0 : .body(Body::from(serde_json::to_string(&error).unwrap()))
499 0 : .unwrap()
500 0 : }
501 :
502 0 : fn render_json(body: Body) -> Response<Body> {
503 0 : Response::builder()
504 0 : .header(CONTENT_TYPE, "application/json")
505 0 : .body(body)
506 0 : .unwrap()
507 0 : }
508 :
509 0 : fn render_plain(body: Body) -> Response<Body> {
510 0 : Response::builder()
511 0 : .header(CONTENT_TYPE, "text/plain")
512 0 : .body(body)
513 0 : .unwrap()
514 0 : }
515 :
516 0 : async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
517 0 : {
518 0 : let mut state = compute.state.lock().unwrap();
519 0 : if state.status == ComputeStatus::Terminated {
520 0 : return Ok(());
521 0 : }
522 0 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
523 0 : let msg = format!(
524 0 : "invalid compute status for termination request: {}",
525 0 : state.status
526 0 : );
527 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
528 0 : }
529 0 : state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
530 0 : drop(state);
531 0 : }
532 0 :
533 0 : forward_termination_signal();
534 0 : info!("sent signal and notified waiters");
535 :
536 : // Spawn a blocking thread to wait for compute to become Terminated.
537 : // This is needed to do not block the main pool of workers and
538 : // be able to serve other requests while some particular request
539 : // is waiting for compute to finish configuration.
540 0 : let c = compute.clone();
541 0 : task::spawn_blocking(move || {
542 0 : let mut state = c.state.lock().unwrap();
543 0 : while state.status != ComputeStatus::Terminated {
544 0 : state = c.state_changed.wait(state).unwrap();
545 0 : info!(
546 0 : "waiting for compute to become {}, current status: {:?}",
547 0 : ComputeStatus::Terminated,
548 0 : state.status
549 : );
550 : }
551 :
552 0 : Ok(())
553 0 : })
554 0 : .await
555 0 : .unwrap()?;
556 0 : info!("terminated Postgres");
557 0 : Ok(())
558 0 : }
559 :
560 : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
561 : #[tokio::main]
562 0 : async fn serve(port: u16, state: Arc<ComputeNode>) {
563 0 : // this usually binds to both IPv4 and IPv6 on linux
564 0 : // see e.g. https://github.com/rust-lang/rust/pull/34440
565 0 : let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
566 0 :
567 0 : let make_service = make_service_fn(move |_conn| {
568 0 : let state = state.clone();
569 0 : async move {
570 0 : Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
571 0 : let state = state.clone();
572 0 : async move {
573 0 : Ok::<_, Infallible>(
574 0 : // NOTE: We include the URI path in the string. It
575 0 : // doesn't contain any variable parts or sensitive
576 0 : // information in this API.
577 0 : tracing_utils::http::tracing_handler(
578 0 : req,
579 0 : |req| routes(req, &state),
580 0 : OtelName::UriPath,
581 0 : )
582 0 : .await,
583 0 : )
584 0 : }
585 0 : }))
586 0 : }
587 0 : });
588 0 :
589 0 : info!("starting HTTP server on {}", addr);
590 0 :
591 0 : let server = Server::bind(&addr).serve(make_service);
592 0 :
593 0 : // Run this server forever
594 0 : if let Err(e) = server.await {
595 0 : error!("server error: {}", e);
596 0 : }
597 0 : }
598 :
599 : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
600 0 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
601 0 : let state = Arc::clone(state);
602 0 :
603 0 : Ok(thread::Builder::new()
604 0 : .name("http-endpoint".into())
605 0 : .spawn(move || serve(port, state))?)
606 0 : }
|