Line data Source code
1 : use anyhow::{anyhow, bail};
2 : use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
3 : use measured::{text::BufferedTextEncoder, MetricGroup};
4 : use metrics::NeonMetrics;
5 : use std::{
6 : convert::Infallible,
7 : net::TcpListener,
8 : sync::{Arc, Mutex},
9 : };
10 : use tracing::{info, info_span};
11 : use utils::http::{
12 : endpoint::{self, request_span},
13 : error::ApiError,
14 : json::json_response,
15 : RouterBuilder, RouterService,
16 : };
17 :
18 : use crate::jemalloc;
19 :
20 0 : async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
21 0 : json_response(StatusCode::OK, "")
22 0 : }
23 :
24 0 : fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
25 0 : let state = Arc::new(Mutex::new(PrometheusHandler {
26 0 : encoder: BufferedTextEncoder::new(),
27 0 : metrics,
28 0 : }));
29 0 :
30 0 : endpoint::make_router()
31 0 : .get("/metrics", move |r| {
32 0 : let state = state.clone();
33 0 : request_span(r, move |b| prometheus_metrics_handler(b, state))
34 0 : })
35 0 : .get("/v1/status", status_handler)
36 0 : }
37 :
38 0 : pub async fn task_main(
39 0 : http_listener: TcpListener,
40 0 : metrics: AppMetrics,
41 0 : ) -> anyhow::Result<Infallible> {
42 0 : scopeguard::defer! {
43 0 : info!("http has shut down");
44 0 : }
45 0 :
46 0 : let service = || RouterService::new(make_router(metrics).build()?);
47 :
48 0 : hyper::Server::from_tcp(http_listener)?
49 0 : .serve(service().map_err(|e| anyhow!(e))?)
50 0 : .await?;
51 :
52 0 : bail!("hyper server without shutdown handling cannot shutdown successfully");
53 0 : }
54 :
55 : struct PrometheusHandler {
56 : encoder: BufferedTextEncoder,
57 : metrics: AppMetrics,
58 : }
59 :
60 : #[derive(MetricGroup)]
61 : pub struct AppMetrics {
62 : #[metric(namespace = "jemalloc")]
63 : pub jemalloc: Option<jemalloc::MetricRecorder>,
64 : #[metric(flatten)]
65 : pub neon_metrics: NeonMetrics,
66 : #[metric(flatten)]
67 : pub proxy: &'static crate::metrics::Metrics,
68 : }
69 :
70 0 : async fn prometheus_metrics_handler(
71 0 : _req: Request<Body>,
72 0 : state: Arc<Mutex<PrometheusHandler>>,
73 0 : ) -> Result<Response<Body>, ApiError> {
74 0 : let started_at = std::time::Instant::now();
75 :
76 0 : let span = info_span!("blocking");
77 0 : let body = tokio::task::spawn_blocking(move || {
78 0 : let _span = span.entered();
79 0 :
80 0 : let mut state = state.lock().unwrap();
81 0 : let PrometheusHandler { encoder, metrics } = &mut *state;
82 0 :
83 0 : metrics
84 0 : .collect_group_into(&mut *encoder)
85 0 : .unwrap_or_else(|infallible| match infallible {});
86 0 :
87 0 : let body = encoder.finish();
88 0 :
89 0 : tracing::info!(
90 0 : bytes = body.len(),
91 0 : elapsed_ms = started_at.elapsed().as_millis(),
92 0 : "responded /metrics"
93 : );
94 :
95 0 : body
96 0 : })
97 0 : .await
98 0 : .unwrap();
99 0 :
100 0 : let response = Response::builder()
101 0 : .status(200)
102 0 : .header(CONTENT_TYPE, "text/plain; version=0.0.4")
103 0 : .body(Body::from(body))
104 0 : .unwrap();
105 0 :
106 0 : Ok(response)
107 0 : }
|