Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::TcpListener;
3 : use std::sync::{Arc, Mutex};
4 :
5 : use anyhow::{anyhow, bail};
6 : use hyper0::header::CONTENT_TYPE;
7 : use hyper0::{Body, Request, Response, StatusCode};
8 : use measured::text::BufferedTextEncoder;
9 : use measured::MetricGroup;
10 : use metrics::NeonMetrics;
11 : use tracing::{info, info_span};
12 : use utils::http::endpoint::{self, request_span};
13 : use utils::http::error::ApiError;
14 : use utils::http::json::json_response;
15 : use utils::http::{RouterBuilder, RouterService};
16 :
17 : use crate::jemalloc;
18 :
19 0 : async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
20 0 : json_response(StatusCode::OK, "")
21 0 : }
22 :
23 0 : fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
24 0 : let state = Arc::new(Mutex::new(PrometheusHandler {
25 0 : encoder: BufferedTextEncoder::new(),
26 0 : metrics,
27 0 : }));
28 0 :
29 0 : endpoint::make_router()
30 0 : .get("/metrics", move |r| {
31 0 : let state = state.clone();
32 0 : request_span(r, move |b| prometheus_metrics_handler(b, state))
33 0 : })
34 0 : .get("/v1/status", status_handler)
35 0 : }
36 :
37 0 : pub async fn task_main(
38 0 : http_listener: TcpListener,
39 0 : metrics: AppMetrics,
40 0 : ) -> anyhow::Result<Infallible> {
41 0 : scopeguard::defer! {
42 0 : info!("http has shut down");
43 0 : }
44 0 :
45 0 : let service = || RouterService::new(make_router(metrics).build()?);
46 :
47 0 : hyper0::Server::from_tcp(http_listener)?
48 0 : .serve(service().map_err(|e| anyhow!(e))?)
49 0 : .await?;
50 :
51 0 : bail!("hyper server without shutdown handling cannot shutdown successfully");
52 0 : }
53 :
54 : struct PrometheusHandler {
55 : encoder: BufferedTextEncoder,
56 : metrics: AppMetrics,
57 : }
58 :
59 : #[derive(MetricGroup)]
60 : pub struct AppMetrics {
61 : #[metric(namespace = "jemalloc")]
62 : pub jemalloc: Option<jemalloc::MetricRecorder>,
63 : #[metric(flatten)]
64 : pub neon_metrics: NeonMetrics,
65 : #[metric(flatten)]
66 : pub proxy: &'static crate::metrics::Metrics,
67 : }
68 :
69 0 : async fn prometheus_metrics_handler(
70 0 : _req: Request<Body>,
71 0 : state: Arc<Mutex<PrometheusHandler>>,
72 0 : ) -> Result<Response<Body>, ApiError> {
73 0 : let started_at = std::time::Instant::now();
74 :
75 0 : let span = info_span!("blocking");
76 0 : let body = tokio::task::spawn_blocking(move || {
77 0 : let _span = span.entered();
78 0 :
79 0 : let mut state = state.lock().unwrap();
80 0 : let PrometheusHandler { encoder, metrics } = &mut *state;
81 0 :
82 0 : metrics
83 0 : .collect_group_into(&mut *encoder)
84 0 : .unwrap_or_else(|infallible| match infallible {});
85 0 :
86 0 : let body = encoder.finish();
87 0 :
88 0 : tracing::info!(
89 0 : bytes = body.len(),
90 0 : elapsed_ms = started_at.elapsed().as_millis(),
91 0 : "responded /metrics"
92 : );
93 :
94 0 : body
95 0 : })
96 0 : .await
97 0 : .unwrap();
98 0 :
99 0 : let response = Response::builder()
100 0 : .status(200)
101 0 : .header(CONTENT_TYPE, "text/plain; version=0.0.4")
102 0 : .body(Body::from(body))
103 0 : .unwrap();
104 0 :
105 0 : Ok(response)
106 0 : }
|