Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::TcpListener;
3 : use std::sync::{Arc, Mutex};
4 :
5 : use anyhow::{anyhow, bail};
6 : use http_utils::endpoint::{self, profile_cpu_handler, profile_heap_handler, request_span};
7 : use http_utils::error::ApiError;
8 : use http_utils::json::json_response;
9 : use http_utils::{RouterBuilder, RouterService};
10 : use hyper0::header::CONTENT_TYPE;
11 : use hyper0::{Body, Request, Response, StatusCode};
12 : use measured::MetricGroup;
13 : use measured::text::BufferedTextEncoder;
14 : use metrics::NeonMetrics;
15 : use tracing::{info, info_span};
16 :
17 : use crate::ext::{LockExt, TaskExt};
18 : use crate::jemalloc;
19 :
20 0 : async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
21 0 : json_response(StatusCode::OK, "")
22 0 : }
23 :
24 0 : fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
25 0 : let state = Arc::new(Mutex::new(PrometheusHandler {
26 0 : encoder: BufferedTextEncoder::new(),
27 0 : metrics,
28 0 : }));
29 :
30 0 : endpoint::make_router()
31 0 : .get("/metrics", move |r| {
32 0 : let state = state.clone();
33 0 : request_span(r, move |b| prometheus_metrics_handler(b, state))
34 0 : })
35 0 : .get("/v1/status", status_handler)
36 0 : .get("/profile/cpu", move |r| {
37 0 : request_span(r, profile_cpu_handler)
38 0 : })
39 0 : .get("/profile/heap", move |r| {
40 0 : request_span(r, profile_heap_handler)
41 0 : })
42 0 : }
43 :
44 0 : pub async fn task_main(
45 0 : http_listener: TcpListener,
46 0 : metrics: AppMetrics,
47 0 : ) -> anyhow::Result<Infallible> {
48 0 : scopeguard::defer! {
49 : info!("http has shut down");
50 : }
51 :
52 0 : let service = || RouterService::new(make_router(metrics).build()?);
53 :
54 0 : hyper0::Server::from_tcp(http_listener)?
55 0 : .serve(service().map_err(|e| anyhow!(e))?)
56 0 : .await?;
57 :
58 0 : bail!("hyper server without shutdown handling cannot shutdown successfully");
59 0 : }
60 :
61 : struct PrometheusHandler {
62 : encoder: BufferedTextEncoder,
63 : metrics: AppMetrics,
64 : }
65 :
66 : #[derive(MetricGroup)]
67 : pub struct AppMetrics {
68 : #[metric(namespace = "jemalloc")]
69 : pub jemalloc: Option<jemalloc::MetricRecorder>,
70 : #[metric(flatten)]
71 : pub neon_metrics: NeonMetrics,
72 : #[metric(flatten)]
73 : pub proxy: &'static crate::metrics::Metrics,
74 : }
75 :
76 0 : async fn prometheus_metrics_handler(
77 0 : _req: Request<Body>,
78 0 : state: Arc<Mutex<PrometheusHandler>>,
79 0 : ) -> Result<Response<Body>, ApiError> {
80 0 : let started_at = std::time::Instant::now();
81 :
82 0 : let span = info_span!("blocking");
83 0 : let body = tokio::task::spawn_blocking(move || {
84 0 : let _span = span.entered();
85 :
86 0 : let mut state = state.lock_propagate_poison();
87 0 : let PrometheusHandler { encoder, metrics } = &mut *state;
88 :
89 0 : metrics
90 0 : .collect_group_into(&mut *encoder)
91 0 : .unwrap_or_else(|infallible| match infallible {});
92 :
93 0 : let body = encoder.finish();
94 :
95 0 : tracing::info!(
96 0 : bytes = body.len(),
97 0 : elapsed_ms = started_at.elapsed().as_millis(),
98 0 : "responded /metrics"
99 : );
100 :
101 0 : body
102 0 : })
103 0 : .await
104 0 : .propagate_task_panic();
105 :
106 0 : let response = Response::builder()
107 0 : .status(200)
108 0 : .header(CONTENT_TYPE, "text/plain; version=0.0.4")
109 0 : .body(Body::from(body))
110 0 : .expect("response headers should be valid");
111 :
112 0 : Ok(response)
113 0 : }
|