LCOV - differential code coverage report
Current view: top level - compute_tools/src/http - api.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 30.1 % 229 69 160 69
Current Date: 2023-10-19 02:04:12 Functions: 45.2 % 42 19 23 19
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::convert::Infallible;
       2                 : use std::net::IpAddr;
       3                 : use std::net::Ipv6Addr;
       4                 : use std::net::SocketAddr;
       5                 : use std::sync::Arc;
       6                 : use std::thread;
       7                 : 
       8                 : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
       9                 : use compute_api::requests::ConfigurationRequest;
      10                 : use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
      11                 : 
      12                 : use anyhow::Result;
      13                 : use hyper::service::{make_service_fn, service_fn};
      14                 : use hyper::{Body, Method, Request, Response, Server, StatusCode};
      15                 : use num_cpus;
      16                 : use serde_json;
      17                 : use tokio::task;
      18                 : use tracing::{error, info, warn};
      19                 : use tracing_utils::http::OtelName;
      20                 : 
      21 CBC        1436 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
      22            1436 :     ComputeStatusResponse {
      23            1436 :         start_time: state.start_time,
      24            1436 :         tenant: state
      25            1436 :             .pspec
      26            1436 :             .as_ref()
      27            1436 :             .map(|pspec| pspec.tenant_id.to_string()),
      28            1436 :         timeline: state
      29            1436 :             .pspec
      30            1436 :             .as_ref()
      31            1436 :             .map(|pspec| pspec.timeline_id.to_string()),
      32            1436 :         status: state.status,
      33            1436 :         last_active: state.last_active,
      34            1436 :         error: state.error.clone(),
      35            1436 :     }
      36            1436 : }
      37                 : 
      38                 : // Service function to handle all available routes.
      39            1454 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
      40                 :     //
      41                 :     // NOTE: The URI path is currently included in traces. That's OK because
      42                 :     // it doesn't contain any variable parts or sensitive information. But
      43                 :     // please keep that in mind if you change the routing here.
      44                 :     //
      45            1454 :     match (req.method(), req.uri().path()) {
      46                 :         // Serialized compute state.
      47            1436 :         (&Method::GET, "/status") => {
      48            1436 :             info!("serving /status GET request");
      49            1436 :             let state = compute.state.lock().unwrap();
      50            1436 :             let status_response = status_response_from_state(&state);
      51            1436 :             Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
      52                 :         }
      53                 : 
      54                 :         // Startup metrics in JSON format. Keep /metrics reserved for a possible
      55                 :         // future use for Prometheus metrics format.
      56 UBC           0 :         (&Method::GET, "/metrics.json") => {
      57               0 :             info!("serving /metrics.json GET request");
      58               0 :             let metrics = compute.state.lock().unwrap().metrics.clone();
      59               0 :             Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
      60                 :         }
      61                 : 
      62                 :         // Collect Postgres current usage insights
      63               0 :         (&Method::GET, "/insights") => {
      64               0 :             info!("serving /insights GET request");
      65               0 :             let status = compute.get_status();
      66               0 :             if status != ComputeStatus::Running {
      67               0 :                 let msg = format!("compute is not running, current status: {:?}", status);
      68               0 :                 error!(msg);
      69               0 :                 return Response::new(Body::from(msg));
      70               0 :             }
      71                 : 
      72               0 :             let insights = compute.collect_insights().await;
      73               0 :             Response::new(Body::from(insights))
      74                 :         }
      75                 : 
      76 CBC          18 :         (&Method::POST, "/check_writability") => {
      77 UBC           0 :             info!("serving /check_writability POST request");
      78               0 :             let status = compute.get_status();
      79               0 :             if status != ComputeStatus::Running {
      80               0 :                 let msg = format!(
      81               0 :                     "invalid compute status for check_writability request: {:?}",
      82               0 :                     status
      83               0 :                 );
      84               0 :                 error!(msg);
      85               0 :                 return Response::new(Body::from(msg));
      86               0 :             }
      87                 : 
      88               0 :             let res = crate::checker::check_writability(compute).await;
      89               0 :             match res {
      90               0 :                 Ok(_) => Response::new(Body::from("true")),
      91               0 :                 Err(e) => {
      92               0 :                     error!("check_writability failed: {}", e);
      93               0 :                     Response::new(Body::from(e.to_string()))
      94                 :                 }
      95                 :             }
      96                 :         }
      97                 : 
      98               0 :         (&Method::GET, "/info") => {
      99               0 :             let num_cpus = num_cpus::get_physical();
     100               0 :             info!("serving /info GET request. num_cpus: {}", num_cpus);
     101               0 :             Response::new(Body::from(
     102               0 :                 serde_json::json!({
     103               0 :                     "num_cpus": num_cpus,
     104               0 :                 })
     105               0 :                 .to_string(),
     106               0 :             ))
     107                 :         }
     108                 : 
     109                 :         // Accept spec in JSON format and request compute configuration. If
     110                 :         // anything goes wrong after we set the compute status to `ConfigurationPending`
     111                 :         // and update compute state with new spec, we basically leave compute
     112                 :         // in the potentially wrong state. That said, it's control-plane's
     113                 :         // responsibility to watch compute state after reconfiguration request
     114                 :         // and to clean restart in case of errors.
     115 CBC          18 :         (&Method::POST, "/configure") => {
     116 UBC           0 :             info!("serving /configure POST request");
     117               0 :             match handle_configure_request(req, compute).await {
     118               0 :                 Ok(msg) => Response::new(Body::from(msg)),
     119               0 :                 Err((msg, code)) => {
     120               0 :                     error!("error handling /configure request: {msg}");
     121               0 :                     render_json_error(&msg, code)
     122                 :                 }
     123                 :             }
     124                 :         }
     125                 : 
     126                 :         // download extension files from S3 on demand
     127 CBC          18 :         (&Method::POST, route) if route.starts_with("/extension_server/") => {
     128              18 :             info!("serving {:?} POST request", route);
     129              18 :             info!("req.uri {:?}", req.uri());
     130                 : 
     131                 :             // don't even try to download extensions
     132                 :             // if no remote storage is configured
     133              18 :             if compute.ext_remote_storage.is_none() {
     134              18 :                 info!("no extensions remote storage configured");
     135              18 :                 let mut resp = Response::new(Body::from("no remote storage configured"));
     136              18 :                 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     137              18 :                 return resp;
     138 UBC           0 :             }
     139               0 : 
     140               0 :             let mut is_library = false;
     141               0 :             if let Some(params) = req.uri().query() {
     142               0 :                 info!("serving {:?} POST request with params: {}", route, params);
     143               0 :                 if params == "is_library=true" {
     144               0 :                     is_library = true;
     145               0 :                 } else {
     146               0 :                     let mut resp = Response::new(Body::from("Wrong request parameters"));
     147               0 :                     *resp.status_mut() = StatusCode::BAD_REQUEST;
     148               0 :                     return resp;
     149                 :                 }
     150               0 :             }
     151               0 :             let filename = route.split('/').last().unwrap().to_string();
     152               0 :             info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
     153                 : 
     154                 :             // get ext_name and path from spec
     155                 :             // don't lock compute_state for too long
     156               0 :             let ext = {
     157               0 :                 let compute_state = compute.state.lock().unwrap();
     158               0 :                 let pspec = compute_state.pspec.as_ref().expect("spec must be set");
     159               0 :                 let spec = &pspec.spec;
     160                 : 
     161                 :                 // debug only
     162               0 :                 info!("spec: {:?}", spec);
     163                 : 
     164               0 :                 let remote_extensions = match spec.remote_extensions.as_ref() {
     165               0 :                     Some(r) => r,
     166                 :                     None => {
     167               0 :                         info!("no remote extensions spec was provided");
     168               0 :                         let mut resp = Response::new(Body::from("no remote storage configured"));
     169               0 :                         *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     170               0 :                         return resp;
     171                 :                     }
     172                 :                 };
     173                 : 
     174               0 :                 remote_extensions.get_ext(
     175               0 :                     &filename,
     176               0 :                     is_library,
     177               0 :                     &compute.build_tag,
     178               0 :                     &compute.pgversion,
     179               0 :                 )
     180               0 :             };
     181               0 : 
     182               0 :             match ext {
     183               0 :                 Ok((ext_name, ext_path)) => {
     184               0 :                     match compute.download_extension(ext_name, ext_path).await {
     185               0 :                         Ok(_) => Response::new(Body::from("OK")),
     186               0 :                         Err(e) => {
     187               0 :                             error!("extension download failed: {}", e);
     188               0 :                             let mut resp = Response::new(Body::from(e.to_string()));
     189               0 :                             *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     190               0 :                             resp
     191                 :                         }
     192                 :                     }
     193                 :                 }
     194               0 :                 Err(e) => {
     195               0 :                     warn!("extension download failed to find extension: {}", e);
     196               0 :                     let mut resp = Response::new(Body::from("failed to find file"));
     197               0 :                     *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
     198               0 :                     resp
     199                 :                 }
     200                 :             }
     201                 :         }
     202                 : 
     203                 :         // Return the `404 Not Found` for any other routes.
     204                 :         _ => {
     205               0 :             let mut not_found = Response::new(Body::from("404 Not Found"));
     206               0 :             *not_found.status_mut() = StatusCode::NOT_FOUND;
     207               0 :             not_found
     208                 :         }
     209                 :     }
     210 CBC        1454 : }
     211                 : 
     212 UBC           0 : async fn handle_configure_request(
     213               0 :     req: Request<Body>,
     214               0 :     compute: &Arc<ComputeNode>,
     215               0 : ) -> Result<String, (String, StatusCode)> {
     216               0 :     if !compute.live_config_allowed {
     217               0 :         return Err((
     218               0 :             "live configuration is not allowed for this compute node".to_string(),
     219               0 :             StatusCode::PRECONDITION_FAILED,
     220               0 :         ));
     221               0 :     }
     222                 : 
     223               0 :     let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
     224               0 :     let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
     225               0 :     if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
     226               0 :         let spec = request.spec;
     227                 : 
     228               0 :         let parsed_spec = match ParsedSpec::try_from(spec) {
     229               0 :             Ok(ps) => ps,
     230               0 :             Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)),
     231                 :         };
     232                 : 
     233                 :         // XXX: wrap state update under lock in code blocks. Otherwise,
     234                 :         // we will try to `Send` `mut state` into the spawned thread
     235                 :         // bellow, which will cause error:
     236                 :         // ```
     237                 :         // error: future cannot be sent between threads safely
     238                 :         // ```
     239                 :         {
     240               0 :             let mut state = compute.state.lock().unwrap();
     241               0 :             if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
     242               0 :                 let msg = format!(
     243               0 :                     "invalid compute status for configuration request: {:?}",
     244               0 :                     state.status.clone()
     245               0 :                 );
     246               0 :                 return Err((msg, StatusCode::PRECONDITION_FAILED));
     247               0 :             }
     248               0 :             state.pspec = Some(parsed_spec);
     249               0 :             state.status = ComputeStatus::ConfigurationPending;
     250               0 :             compute.state_changed.notify_all();
     251               0 :             drop(state);
     252               0 :             info!("set new spec and notified waiters");
     253                 :         }
     254                 : 
     255                 :         // Spawn a blocking thread to wait for compute to become Running.
     256                 :         // This is needed to do not block the main pool of workers and
     257                 :         // be able to serve other requests while some particular request
     258                 :         // is waiting for compute to finish configuration.
     259               0 :         let c = compute.clone();
     260               0 :         task::spawn_blocking(move || {
     261               0 :             let mut state = c.state.lock().unwrap();
     262               0 :             while state.status != ComputeStatus::Running {
     263               0 :                 state = c.state_changed.wait(state).unwrap();
     264               0 :                 info!(
     265               0 :                     "waiting for compute to become Running, current status: {:?}",
     266               0 :                     state.status
     267               0 :                 );
     268                 : 
     269               0 :                 if state.status == ComputeStatus::Failed {
     270               0 :                     let err = state.error.as_ref().map_or("unknown error", |x| x);
     271               0 :                     let msg = format!("compute configuration failed: {:?}", err);
     272               0 :                     return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
     273               0 :                 }
     274                 :             }
     275                 : 
     276               0 :             Ok(())
     277               0 :         })
     278               0 :         .await
     279               0 :         .unwrap()?;
     280                 : 
     281                 :         // Return current compute state if everything went well.
     282               0 :         let state = compute.state.lock().unwrap().clone();
     283               0 :         let status_response = status_response_from_state(&state);
     284               0 :         Ok(serde_json::to_string(&status_response).unwrap())
     285                 :     } else {
     286               0 :         Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
     287                 :     }
     288               0 : }
     289                 : 
     290               0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
     291               0 :     let error = GenericAPIError {
     292               0 :         error: e.to_string(),
     293               0 :     };
     294               0 :     Response::builder()
     295               0 :         .status(status)
     296               0 :         .body(Body::from(serde_json::to_string(&error).unwrap()))
     297               0 :         .unwrap()
     298               0 : }
     299                 : 
     300                 : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
     301                 : #[tokio::main]
     302 CBC         641 : async fn serve(port: u16, state: Arc<ComputeNode>) {
     303             641 :     // this usually binds to both IPv4 and IPv6 on linux
     304             641 :     // see e.g. https://github.com/rust-lang/rust/pull/34440
     305             641 :     let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
     306             641 : 
     307            1454 :     let make_service = make_service_fn(move |_conn| {
     308            1454 :         let state = state.clone();
     309            1454 :         async move {
     310            1454 :             Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
     311            1454 :                 let state = state.clone();
     312            1454 :                 async move {
     313            1454 :                     Ok::<_, Infallible>(
     314            1454 :                         // NOTE: We include the URI path in the string. It
     315            1454 :                         // doesn't contain any variable parts or sensitive
     316            1454 :                         // information in this API.
     317            1454 :                         tracing_utils::http::tracing_handler(
     318            1454 :                             req,
     319            1454 :                             |req| routes(req, &state),
     320            1454 :                             OtelName::UriPath,
     321            1454 :                         )
     322 UBC           0 :                         .await,
     323                 :                     )
     324 CBC        1454 :                 }
     325            1454 :             }))
     326            1454 :         }
     327            1454 :     });
     328             641 : 
     329             641 :     info!("starting HTTP server on {}", addr);
     330                 : 
     331             641 :     let server = Server::bind(&addr).serve(make_service);
     332                 : 
     333                 :     // Run this server forever
     334            1454 :     if let Err(e) = server.await {
     335 UBC           0 :         error!("server error: {}", e);
     336               0 :     }
     337                 : }
     338                 : 
     339                 : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
     340 CBC         641 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
     341             641 :     let state = Arc::clone(state);
     342             641 : 
     343             641 :     Ok(thread::Builder::new()
     344             641 :         .name("http-endpoint".into())
     345             641 :         .spawn(move || serve(port, state))?)
     346             641 : }
        

Generated by: LCOV version 2.1-beta