LCOV - code coverage report
Current view: top level - proxy/src/http - health_server.rs (source / functions) Coverage Total Hit
Test: b9d67f908f91f00e353a27440ba89f642a869959.info Lines: 0.0 % 65 0
Test Date: 2024-11-19 21:44:13 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use std::convert::Infallible;
       2              : use std::net::TcpListener;
       3              : use std::sync::{Arc, Mutex};
       4              : 
       5              : use anyhow::{anyhow, bail};
       6              : use hyper0::header::CONTENT_TYPE;
       7              : use hyper0::{Body, Request, Response, StatusCode};
       8              : use measured::text::BufferedTextEncoder;
       9              : use measured::MetricGroup;
      10              : use metrics::NeonMetrics;
      11              : use tracing::{info, info_span};
      12              : use utils::http::endpoint::{self, request_span};
      13              : use utils::http::error::ApiError;
      14              : use utils::http::json::json_response;
      15              : use utils::http::{RouterBuilder, RouterService};
      16              : 
      17              : use crate::jemalloc;
      18              : 
      19            0 : async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
      20            0 :     json_response(StatusCode::OK, "")
      21            0 : }
      22              : 
      23            0 : fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
      24            0 :     let state = Arc::new(Mutex::new(PrometheusHandler {
      25            0 :         encoder: BufferedTextEncoder::new(),
      26            0 :         metrics,
      27            0 :     }));
      28            0 : 
      29            0 :     endpoint::make_router()
      30            0 :         .get("/metrics", move |r| {
      31            0 :             let state = state.clone();
      32            0 :             request_span(r, move |b| prometheus_metrics_handler(b, state))
      33            0 :         })
      34            0 :         .get("/v1/status", status_handler)
      35            0 : }
      36              : 
      37            0 : pub async fn task_main(
      38            0 :     http_listener: TcpListener,
      39            0 :     metrics: AppMetrics,
      40            0 : ) -> anyhow::Result<Infallible> {
      41            0 :     scopeguard::defer! {
      42            0 :         info!("http has shut down");
      43            0 :     }
      44            0 : 
      45            0 :     let service = || RouterService::new(make_router(metrics).build()?);
      46              : 
      47            0 :     hyper0::Server::from_tcp(http_listener)?
      48            0 :         .serve(service().map_err(|e| anyhow!(e))?)
      49            0 :         .await?;
      50              : 
      51            0 :     bail!("hyper server without shutdown handling cannot shutdown successfully");
      52            0 : }
      53              : 
      54              : struct PrometheusHandler {
      55              :     encoder: BufferedTextEncoder,
      56              :     metrics: AppMetrics,
      57              : }
      58              : 
      59              : #[derive(MetricGroup)]
      60              : pub struct AppMetrics {
      61              :     #[metric(namespace = "jemalloc")]
      62              :     pub jemalloc: Option<jemalloc::MetricRecorder>,
      63              :     #[metric(flatten)]
      64              :     pub neon_metrics: NeonMetrics,
      65              :     #[metric(flatten)]
      66              :     pub proxy: &'static crate::metrics::Metrics,
      67              : }
      68              : 
      69            0 : async fn prometheus_metrics_handler(
      70            0 :     _req: Request<Body>,
      71            0 :     state: Arc<Mutex<PrometheusHandler>>,
      72            0 : ) -> Result<Response<Body>, ApiError> {
      73            0 :     let started_at = std::time::Instant::now();
      74              : 
      75            0 :     let span = info_span!("blocking");
      76            0 :     let body = tokio::task::spawn_blocking(move || {
      77            0 :         let _span = span.entered();
      78            0 : 
      79            0 :         let mut state = state.lock().unwrap();
      80            0 :         let PrometheusHandler { encoder, metrics } = &mut *state;
      81            0 : 
      82            0 :         metrics
      83            0 :             .collect_group_into(&mut *encoder)
      84            0 :             .unwrap_or_else(|infallible| match infallible {});
      85            0 : 
      86            0 :         let body = encoder.finish();
      87            0 : 
      88            0 :         tracing::info!(
      89            0 :             bytes = body.len(),
      90            0 :             elapsed_ms = started_at.elapsed().as_millis(),
      91            0 :             "responded /metrics"
      92              :         );
      93              : 
      94            0 :         body
      95            0 :     })
      96            0 :     .await
      97            0 :     .unwrap();
      98            0 : 
      99            0 :     let response = Response::builder()
     100            0 :         .status(200)
     101            0 :         .header(CONTENT_TYPE, "text/plain; version=0.0.4")
     102            0 :         .body(Body::from(body))
     103            0 :         .unwrap();
     104            0 : 
     105            0 :     Ok(response)
     106            0 : }
        

Generated by: LCOV version 2.1-beta