LCOV - code coverage report
Current view: top level - compute_tools/src/http - api.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 429 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 26 0

            Line data    Source code
       1              : use std::convert::Infallible;
       2              : use std::net::IpAddr;
       3              : use std::net::Ipv6Addr;
       4              : use std::net::SocketAddr;
       5              : use std::sync::Arc;
       6              : use std::thread;
       7              : 
       8              : use crate::catalog::SchemaDumpError;
       9              : use crate::catalog::{get_database_schema, get_dbs_and_roles};
      10              : use crate::compute::forward_termination_signal;
      11              : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
      12              : use crate::installed_extensions;
      13              : use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
      14              : use compute_api::responses::{
      15              :     ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
      16              :     SetRoleGrantsResponse,
      17              : };
      18              : 
      19              : use anyhow::Result;
      20              : use hyper::header::CONTENT_TYPE;
      21              : use hyper::service::{make_service_fn, service_fn};
      22              : use hyper::{Body, Method, Request, Response, Server, StatusCode};
      23              : use metrics::proto::MetricFamily;
      24              : use metrics::Encoder;
      25              : use metrics::TextEncoder;
      26              : use tokio::task;
      27              : use tokio_util::sync::CancellationToken;
      28              : use tracing::{debug, error, info, warn};
      29              : use tracing_utils::http::OtelName;
      30              : use utils::failpoint_support::failpoints_handler;
      31              : use utils::http::error::ApiError;
      32              : use utils::http::request::must_get_query_param;
      33              : 
      34            0 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
      35            0 :     ComputeStatusResponse {
      36            0 :         start_time: state.start_time,
      37            0 :         tenant: state
      38            0 :             .pspec
      39            0 :             .as_ref()
      40            0 :             .map(|pspec| pspec.tenant_id.to_string()),
      41            0 :         timeline: state
      42            0 :             .pspec
      43            0 :             .as_ref()
      44            0 :             .map(|pspec| pspec.timeline_id.to_string()),
      45            0 :         status: state.status,
      46            0 :         last_active: state.last_active,
      47            0 :         error: state.error.clone(),
      48            0 :     }
      49            0 : }
      50              : 
      51              : // Service function to handle all available routes.
      52            0 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
      53            0 :     //
      54            0 :     // NOTE: The URI path is currently included in traces. That's OK because
      55            0 :     // it doesn't contain any variable parts or sensitive information. But
      56            0 :     // please keep that in mind if you change the routing here.
      57            0 :     //
      58            0 :     match (req.method(), req.uri().path()) {
      59              :         // Serialized compute state.
      60            0 :         (&Method::GET, "/status") => {
      61            0 :             debug!("serving /status GET request");
      62            0 :             let state = compute.state.lock().unwrap();
      63            0 :             let status_response = status_response_from_state(&state);
      64            0 :             Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
      65              :         }
      66              : 
      67              :         // Startup metrics in JSON format. Keep /metrics reserved for a possible
      68              :         // future use for Prometheus metrics format.
      69            0 :         (&Method::GET, "/metrics.json") => {
      70            0 :             info!("serving /metrics.json GET request");
      71            0 :             let metrics = compute.state.lock().unwrap().metrics.clone();
      72            0 :             Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
      73              :         }
      74              : 
      75              :         // Prometheus metrics
      76            0 :         (&Method::GET, "/metrics") => {
      77            0 :             debug!("serving /metrics GET request");
      78              : 
      79              :             // When we call TextEncoder::encode() below, it will immediately
      80              :             // return an error if a metric family has no metrics, so we need to
      81              :             // preemptively filter out metric families with no metrics.
      82            0 :             let metrics = installed_extensions::collect()
      83            0 :                 .into_iter()
      84            0 :                 .filter(|m| !m.get_metric().is_empty())
      85            0 :                 .collect::<Vec<MetricFamily>>();
      86            0 : 
      87            0 :             let encoder = TextEncoder::new();
      88            0 :             let mut buffer = vec![];
      89              : 
      90            0 :             if let Err(err) = encoder.encode(&metrics, &mut buffer) {
      91            0 :                 let msg = format!("error handling /metrics request: {err}");
      92            0 :                 error!(msg);
      93            0 :                 return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR);
      94            0 :             }
      95            0 : 
      96            0 :             match Response::builder()
      97            0 :                 .status(StatusCode::OK)
      98            0 :                 .header(CONTENT_TYPE, encoder.format_type())
      99            0 :                 .body(Body::from(buffer))
     100              :             {
     101            0 :                 Ok(response) => response,
     102            0 :                 Err(err) => {
     103            0 :                     let msg = format!("error handling /metrics request: {err}");
     104            0 :                     error!(msg);
     105            0 :                     render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
     106              :                 }
     107              :             }
     108              :         }
     109              :         // Collect Postgres current usage insights
     110            0 :         (&Method::GET, "/insights") => {
     111            0 :             info!("serving /insights GET request");
     112            0 :             let status = compute.get_status();
     113            0 :             if status != ComputeStatus::Running {
     114            0 :                 let msg = format!("compute is not running, current status: {:?}", status);
     115            0 :                 error!(msg);
     116            0 :                 return Response::new(Body::from(msg));
     117            0 :             }
     118              : 
     119            0 :             let insights = compute.collect_insights().await;
     120            0 :             Response::new(Body::from(insights))
     121              :         }
     122              : 
     123            0 :         (&Method::POST, "/check_writability") => {
     124            0 :             info!("serving /check_writability POST request");
     125            0 :             let status = compute.get_status();
     126            0 :             if status != ComputeStatus::Running {
     127            0 :                 let msg = format!(
     128            0 :                     "invalid compute status for check_writability request: {:?}",
     129            0 :                     status
     130            0 :                 );
     131            0 :                 error!(msg);
     132            0 :                 return Response::new(Body::from(msg));
     133            0 :             }
     134              : 
     135            0 :             let res = crate::checker::check_writability(compute).await;
     136            0 :             match res {
     137            0 :                 Ok(_) => Response::new(Body::from("true")),
     138            0 :                 Err(e) => {
     139            0 :                     error!("check_writability failed: {}", e);
     140            0 :                     Response::new(Body::from(e.to_string()))
     141              :                 }
     142              :             }
     143              :         }
     144              : 
     145            0 :         (&Method::POST, "/extensions") => {
     146            0 :             info!("serving /extensions POST request");
     147            0 :             let status = compute.get_status();
     148            0 :             if status != ComputeStatus::Running {
     149            0 :                 let msg = format!(
     150            0 :                     "invalid compute status for extensions request: {:?}",
     151            0 :                     status
     152            0 :                 );
     153            0 :                 error!(msg);
     154            0 :                 return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
     155            0 :             }
     156              : 
     157            0 :             let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
     158            0 :             let request = serde_json::from_slice::<ExtensionInstallRequest>(&request).unwrap();
     159            0 :             let res = compute
     160            0 :                 .install_extension(&request.extension, &request.database, request.version)
     161            0 :                 .await;
     162            0 :             match res {
     163            0 :                 Ok(version) => render_json(Body::from(
     164            0 :                     serde_json::to_string(&ExtensionInstallResult {
     165            0 :                         extension: request.extension,
     166            0 :                         version,
     167            0 :                     })
     168            0 :                     .unwrap(),
     169            0 :                 )),
     170            0 :                 Err(e) => {
     171            0 :                     error!("install_extension failed: {}", e);
     172            0 :                     render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
     173              :                 }
     174              :             }
     175              :         }
     176              : 
     177            0 :         (&Method::GET, "/info") => {
     178            0 :             let num_cpus = num_cpus::get_physical();
     179            0 :             info!("serving /info GET request. num_cpus: {}", num_cpus);
     180            0 :             Response::new(Body::from(
     181            0 :                 serde_json::json!({
     182            0 :                     "num_cpus": num_cpus,
     183            0 :                 })
     184            0 :                 .to_string(),
     185            0 :             ))
     186              :         }
     187              : 
     188              :         // Accept spec in JSON format and request compute configuration. If
     189              :         // anything goes wrong after we set the compute status to `ConfigurationPending`
     190              :         // and update compute state with new spec, we basically leave compute
     191              :         // in the potentially wrong state. That said, it's control-plane's
     192              :         // responsibility to watch compute state after reconfiguration request
     193              :         // and to clean restart in case of errors.
     194            0 :         (&Method::POST, "/configure") => {
     195            0 :             info!("serving /configure POST request");
     196            0 :             match handle_configure_request(req, compute).await {
     197            0 :                 Ok(msg) => Response::new(Body::from(msg)),
     198            0 :                 Err((msg, code)) => {
     199            0 :                     error!("error handling /configure request: {msg}");
     200            0 :                     render_json_error(&msg, code)
     201              :                 }
     202              :             }
     203              :         }
     204              : 
     205            0 :         (&Method::POST, "/terminate") => {
     206            0 :             info!("serving /terminate POST request");
     207            0 :             match handle_terminate_request(compute).await {
     208            0 :                 Ok(()) => Response::new(Body::empty()),
     209            0 :                 Err((msg, code)) => {
     210            0 :                     error!("error handling /terminate request: {msg}");
     211            0 :                     render_json_error(&msg, code)
     212              :                 }
     213              :             }
     214              :         }
     215              : 
     216            0 :         (&Method::GET, "/dbs_and_roles") => {
     217            0 :             info!("serving /dbs_and_roles GET request",);
     218            0 :             match get_dbs_and_roles(compute).await {
     219            0 :                 Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
     220              :                 Err(_) => {
     221            0 :                     render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR)
     222              :                 }
     223              :             }
     224              :         }
     225              : 
     226            0 :         (&Method::GET, "/database_schema") => {
     227            0 :             let database = match must_get_query_param(&req, "database") {
     228            0 :                 Err(e) => return e.into_response(),
     229            0 :                 Ok(database) => database,
     230            0 :             };
     231            0 :             info!("serving /database_schema GET request with database: {database}",);
     232            0 :             match get_database_schema(compute, &database).await {
     233            0 :                 Ok(res) => render_plain(Body::wrap_stream(res)),
     234              :                 Err(SchemaDumpError::DatabaseDoesNotExist) => {
     235            0 :                     render_json_error("database does not exist", StatusCode::NOT_FOUND)
     236              :                 }
     237            0 :                 Err(e) => {
     238            0 :                     error!("can't get schema dump: {}", e);
     239            0 :                     render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR)
     240              :                 }
     241              :             }
     242              :         }
     243              : 
     244            0 :         (&Method::POST, "/grants") => {
     245            0 :             info!("serving /grants POST request");
     246            0 :             let status = compute.get_status();
     247            0 :             if status != ComputeStatus::Running {
     248            0 :                 let msg = format!(
     249            0 :                     "invalid compute status for set_role_grants request: {:?}",
     250            0 :                     status
     251            0 :                 );
     252            0 :                 error!(msg);
     253            0 :                 return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
     254            0 :             }
     255              : 
     256            0 :             let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
     257            0 :             let request = serde_json::from_slice::<SetRoleGrantsRequest>(&request).unwrap();
     258              : 
     259            0 :             let res = compute
     260            0 :                 .set_role_grants(
     261            0 :                     &request.database,
     262            0 :                     &request.schema,
     263            0 :                     &request.privileges,
     264            0 :                     &request.role,
     265            0 :                 )
     266            0 :                 .await;
     267            0 :             match res {
     268            0 :                 Ok(()) => render_json(Body::from(
     269            0 :                     serde_json::to_string(&SetRoleGrantsResponse {
     270            0 :                         database: request.database,
     271            0 :                         schema: request.schema,
     272            0 :                         role: request.role,
     273            0 :                         privileges: request.privileges,
     274            0 :                     })
     275            0 :                     .unwrap(),
     276            0 :                 )),
     277            0 :                 Err(e) => render_json_error(
     278            0 :                     &format!("could not grant role privileges to the schema: {e}"),
     279            0 :                     // TODO: can we filter on role/schema not found errors
     280            0 :                     // and return appropriate error code?
     281            0 :                     StatusCode::INTERNAL_SERVER_ERROR,
     282            0 :                 ),
     283              :             }
     284              :         }
     285              : 
     286              :         // get the list of installed extensions
     287              :         // currently only used in python tests
     288              :         // TODO: call it from cplane
     289            0 :         (&Method::GET, "/installed_extensions") => {
     290            0 :             info!("serving /installed_extensions GET request");
     291            0 :             let status = compute.get_status();
     292            0 :             if status != ComputeStatus::Running {
     293            0 :                 let msg = format!(
     294            0 :                     "invalid compute status for extensions request: {:?}",
     295            0 :                     status
     296            0 :                 );
     297            0 :                 error!(msg);
     298            0 :                 return Response::new(Body::from(msg));
     299            0 :             }
     300            0 : 
     301            0 :             let conf = compute.get_conn_conf(None);
     302            0 :             let res =
     303            0 :                 task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
     304            0 :                     .await
     305            0 :                     .unwrap();
     306            0 : 
     307            0 :             match res {
     308            0 :                 Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
     309            0 :                 Err(e) => render_json_error(
     310            0 :                     &format!("could not get list of installed extensions: {}", e),
     311            0 :                     StatusCode::INTERNAL_SERVER_ERROR,
     312            0 :                 ),
     313              :             }
     314              :         }
     315              : 
     316            0 :         (&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
     317            0 :             match failpoints_handler(req, CancellationToken::new()).await {
     318            0 :                 Ok(r) => r,
     319            0 :                 Err(ApiError::BadRequest(e)) => {
     320            0 :                     render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
     321              :                 }
     322              :                 Err(_) => {
     323            0 :                     render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
     324              :                 }
     325              :             }
     326              :         }
     327              : 
     328              :         // download extension files from remote extension storage on demand
     329            0 :         (&Method::POST, route) if route.starts_with("/extension_server/") => {
     330            0 :             info!("serving {:?} POST request", route);
     331            0 :             info!("req.uri {:?}", req.uri());
     332              : 
     333              :             // don't even try to download extensions
     334              :             // if no remote storage is configured
     335            0 :             if compute.ext_remote_storage.is_none() {
     336            0 :                 info!("no extensions remote storage configured");
     337            0 :                 let mut resp = Response::new(Body::from("no remote storage configured"));
     338            0 :                 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     339            0 :                 return resp;
     340            0 :             }
     341            0 : 
     342            0 :             let mut is_library = false;
     343            0 :             if let Some(params) = req.uri().query() {
     344            0 :                 info!("serving {:?} POST request with params: {}", route, params);
     345            0 :                 if params == "is_library=true" {
     346            0 :                     is_library = true;
     347            0 :                 } else {
     348            0 :                     let mut resp = Response::new(Body::from("Wrong request parameters"));
     349            0 :                     *resp.status_mut() = StatusCode::BAD_REQUEST;
     350            0 :                     return resp;
     351              :                 }
     352            0 :             }
     353            0 :             let filename = route.split('/').last().unwrap().to_string();
     354            0 :             info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
     355              : 
     356              :             // get ext_name and path from spec
     357              :             // don't lock compute_state for too long
     358            0 :             let ext = {
     359            0 :                 let compute_state = compute.state.lock().unwrap();
     360            0 :                 let pspec = compute_state.pspec.as_ref().expect("spec must be set");
     361            0 :                 let spec = &pspec.spec;
     362            0 : 
     363            0 :                 // debug only
     364            0 :                 info!("spec: {:?}", spec);
     365              : 
     366            0 :                 let remote_extensions = match spec.remote_extensions.as_ref() {
     367            0 :                     Some(r) => r,
     368              :                     None => {
     369            0 :                         info!("no remote extensions spec was provided");
     370            0 :                         let mut resp = Response::new(Body::from("no remote storage configured"));
     371            0 :                         *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     372            0 :                         return resp;
     373              :                     }
     374              :                 };
     375              : 
     376            0 :                 remote_extensions.get_ext(
     377            0 :                     &filename,
     378            0 :                     is_library,
     379            0 :                     &compute.build_tag,
     380            0 :                     &compute.pgversion,
     381            0 :                 )
     382            0 :             };
     383            0 : 
     384            0 :             match ext {
     385            0 :                 Ok((ext_name, ext_path)) => {
     386            0 :                     match compute.download_extension(ext_name, ext_path).await {
     387            0 :                         Ok(_) => Response::new(Body::from("OK")),
     388            0 :                         Err(e) => {
     389            0 :                             error!("extension download failed: {}", e);
     390            0 :                             let mut resp = Response::new(Body::from(e.to_string()));
     391            0 :                             *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     392            0 :                             resp
     393              :                         }
     394              :                     }
     395              :                 }
     396            0 :                 Err(e) => {
     397            0 :                     warn!("extension download failed to find extension: {}", e);
     398            0 :                     let mut resp = Response::new(Body::from("failed to find file"));
     399            0 :                     *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     400            0 :                     resp
     401              :                 }
     402              :             }
     403              :         }
     404              : 
     405              :         // Return the `404 Not Found` for any other routes.
     406              :         _ => {
     407            0 :             let mut not_found = Response::new(Body::from("404 Not Found"));
     408            0 :             *not_found.status_mut() = StatusCode::NOT_FOUND;
     409            0 :             not_found
     410              :         }
     411              :     }
     412            0 : }
     413              : 
     414            0 : async fn handle_configure_request(
     415            0 :     req: Request<Body>,
     416            0 :     compute: &Arc<ComputeNode>,
     417            0 : ) -> Result<String, (String, StatusCode)> {
     418            0 :     if !compute.live_config_allowed {
     419            0 :         return Err((
     420            0 :             "live configuration is not allowed for this compute node".to_string(),
     421            0 :             StatusCode::PRECONDITION_FAILED,
     422            0 :         ));
     423            0 :     }
     424              : 
     425            0 :     let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
     426            0 :     let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
     427            0 :     if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
     428            0 :         let spec = request.spec;
     429              : 
     430            0 :         let parsed_spec = match ParsedSpec::try_from(spec) {
     431            0 :             Ok(ps) => ps,
     432            0 :             Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
     433              :         };
     434              : 
     435              :         // XXX: wrap state update under lock in code blocks. Otherwise,
     436              :         // we will try to `Send` `mut state` into the spawned thread
     437              :         // bellow, which will cause error:
     438              :         // ```
     439              :         // error: future cannot be sent between threads safely
     440              :         // ```
     441              :         {
     442            0 :             let mut state = compute.state.lock().unwrap();
     443            0 :             if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
     444            0 :                 let msg = format!(
     445            0 :                     "invalid compute status for configuration request: {:?}",
     446            0 :                     state.status.clone()
     447            0 :                 );
     448            0 :                 return Err((msg, StatusCode::PRECONDITION_FAILED));
     449            0 :             }
     450            0 :             state.pspec = Some(parsed_spec);
     451            0 :             state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
     452            0 :             drop(state);
     453            0 :             info!("set new spec and notified waiters");
     454              :         }
     455              : 
     456              :         // Spawn a blocking thread to wait for compute to become Running.
     457              :         // This is needed to do not block the main pool of workers and
     458              :         // be able to serve other requests while some particular request
     459              :         // is waiting for compute to finish configuration.
     460            0 :         let c = compute.clone();
     461            0 :         task::spawn_blocking(move || {
     462            0 :             let mut state = c.state.lock().unwrap();
     463            0 :             while state.status != ComputeStatus::Running {
     464            0 :                 state = c.state_changed.wait(state).unwrap();
     465            0 :                 info!(
     466            0 :                     "waiting for compute to become Running, current status: {:?}",
     467            0 :                     state.status
     468              :                 );
     469              : 
     470            0 :                 if state.status == ComputeStatus::Failed {
     471            0 :                     let err = state.error.as_ref().map_or("unknown error", |x| x);
     472            0 :                     let msg = format!("compute configuration failed: {:?}", err);
     473            0 :                     return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
     474            0 :                 }
     475              :             }
     476              : 
     477            0 :             Ok(())
     478            0 :         })
     479            0 :         .await
     480            0 :         .unwrap()?;
     481              : 
     482              :         // Return current compute state if everything went well.
     483            0 :         let state = compute.state.lock().unwrap().clone();
     484            0 :         let status_response = status_response_from_state(&state);
     485            0 :         Ok(serde_json::to_string(&status_response).unwrap())
     486              :     } else {
     487            0 :         Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
     488              :     }
     489            0 : }
     490              : 
     491            0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
     492            0 :     let error = GenericAPIError {
     493            0 :         error: e.to_string(),
     494            0 :     };
     495            0 :     Response::builder()
     496            0 :         .status(status)
     497            0 :         .header(CONTENT_TYPE, "application/json")
     498            0 :         .body(Body::from(serde_json::to_string(&error).unwrap()))
     499            0 :         .unwrap()
     500            0 : }
     501              : 
     502            0 : fn render_json(body: Body) -> Response<Body> {
     503            0 :     Response::builder()
     504            0 :         .header(CONTENT_TYPE, "application/json")
     505            0 :         .body(body)
     506            0 :         .unwrap()
     507            0 : }
     508              : 
     509            0 : fn render_plain(body: Body) -> Response<Body> {
     510            0 :     Response::builder()
     511            0 :         .header(CONTENT_TYPE, "text/plain")
     512            0 :         .body(body)
     513            0 :         .unwrap()
     514            0 : }
     515              : 
     516            0 : async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
     517            0 :     {
     518            0 :         let mut state = compute.state.lock().unwrap();
     519            0 :         if state.status == ComputeStatus::Terminated {
     520            0 :             return Ok(());
     521            0 :         }
     522            0 :         if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
     523            0 :             let msg = format!(
     524            0 :                 "invalid compute status for termination request: {}",
     525            0 :                 state.status
     526            0 :             );
     527            0 :             return Err((msg, StatusCode::PRECONDITION_FAILED));
     528            0 :         }
     529            0 :         state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
     530            0 :         drop(state);
     531            0 :     }
     532            0 : 
     533            0 :     forward_termination_signal();
     534            0 :     info!("sent signal and notified waiters");
     535              : 
     536              :     // Spawn a blocking thread to wait for compute to become Terminated.
     537              :     // This is needed to do not block the main pool of workers and
     538              :     // be able to serve other requests while some particular request
     539              :     // is waiting for compute to finish configuration.
     540            0 :     let c = compute.clone();
     541            0 :     task::spawn_blocking(move || {
     542            0 :         let mut state = c.state.lock().unwrap();
     543            0 :         while state.status != ComputeStatus::Terminated {
     544            0 :             state = c.state_changed.wait(state).unwrap();
     545            0 :             info!(
     546            0 :                 "waiting for compute to become {}, current status: {:?}",
     547            0 :                 ComputeStatus::Terminated,
     548            0 :                 state.status
     549              :             );
     550              :         }
     551              : 
     552            0 :         Ok(())
     553            0 :     })
     554            0 :     .await
     555            0 :     .unwrap()?;
     556            0 :     info!("terminated Postgres");
     557            0 :     Ok(())
     558            0 : }
     559              : 
     560              : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
     561              : #[tokio::main]
     562            0 : async fn serve(port: u16, state: Arc<ComputeNode>) {
     563            0 :     // this usually binds to both IPv4 and IPv6 on linux
     564            0 :     // see e.g. https://github.com/rust-lang/rust/pull/34440
     565            0 :     let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
     566            0 : 
     567            0 :     let make_service = make_service_fn(move |_conn| {
     568            0 :         let state = state.clone();
     569            0 :         async move {
     570            0 :             Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
     571            0 :                 let state = state.clone();
     572            0 :                 async move {
     573            0 :                     Ok::<_, Infallible>(
     574            0 :                         // NOTE: We include the URI path in the string. It
     575            0 :                         // doesn't contain any variable parts or sensitive
     576            0 :                         // information in this API.
     577            0 :                         tracing_utils::http::tracing_handler(
     578            0 :                             req,
     579            0 :                             |req| routes(req, &state),
     580            0 :                             OtelName::UriPath,
     581            0 :                         )
     582            0 :                         .await,
     583            0 :                     )
     584            0 :                 }
     585            0 :             }))
     586            0 :         }
     587            0 :     });
     588            0 : 
     589            0 :     info!("starting HTTP server on {}", addr);
     590            0 : 
     591            0 :     let server = Server::bind(&addr).serve(make_service);
     592            0 : 
     593            0 :     // Run this server forever
     594            0 :     if let Err(e) = server.await {
     595            0 :         error!("server error: {}", e);
     596            0 :     }
     597            0 : }
     598              : 
     599              : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
     600            0 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
     601            0 :     let state = Arc::clone(state);
     602            0 : 
     603            0 :     Ok(thread::Builder::new()
     604            0 :         .name("http-endpoint".into())
     605            0 :         .spawn(move || serve(port, state))?)
     606            0 : }
        

Generated by: LCOV version 2.1-beta