Line data Source code
1 : use std::path::Path;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use axum::body::Body;
6 : use axum::extract::State;
7 : use axum::response::Response;
8 : use http::header::CONTENT_TYPE;
9 : use http_body_util::BodyExt;
10 : use hyper::{Request, StatusCode};
11 : use metrics::proto::MetricFamily;
12 : use metrics::{Encoder, TextEncoder};
13 :
14 : use crate::communicator_socket_client::connect_communicator_socket;
15 : use crate::compute::ComputeNode;
16 : use crate::hadron_metrics;
17 : use crate::http::JsonResponse;
18 : use crate::metrics::collect;
19 :
20 : /// Expose Prometheus metrics.
21 0 : pub(in crate::http) async fn get_metrics() -> Response {
22 : // When we call TextEncoder::encode() below, it will immediately return an
23 : // error if a metric family has no metrics, so we need to preemptively
24 : // filter out metric families with no metrics.
25 0 : let mut metrics = collect()
26 0 : .into_iter()
27 0 : .filter(|m| !m.get_metric().is_empty())
28 0 : .collect::<Vec<MetricFamily>>();
29 :
30 : // Add Hadron metrics.
31 0 : let hadron_metrics: Vec<MetricFamily> = hadron_metrics::collect()
32 0 : .into_iter()
33 0 : .filter(|m| !m.get_metric().is_empty())
34 0 : .collect();
35 0 : metrics.extend(hadron_metrics);
36 :
37 0 : let encoder = TextEncoder::new();
38 0 : let mut buffer = vec![];
39 :
40 0 : if let Err(e) = encoder.encode(&metrics, &mut buffer) {
41 0 : return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
42 0 : }
43 :
44 0 : Response::builder()
45 0 : .status(StatusCode::OK)
46 0 : .header(CONTENT_TYPE, encoder.format_type())
47 0 : .body(Body::from(buffer))
48 0 : .unwrap()
49 0 : }
50 :
51 : /// Fetch and forward metrics from the Postgres neon extension's metrics
52 : /// exporter that are used by autoscaling-agent.
53 : ///
54 : /// The neon extension exposes these metrics over a Unix domain socket
55 : /// in the data directory. That's not accessible directly from the outside
56 : /// world, so we have this endpoint in compute_ctl to expose it
57 0 : pub(in crate::http) async fn get_autoscaling_metrics(
58 0 : State(compute): State<Arc<ComputeNode>>,
59 0 : ) -> Result<Response, Response> {
60 0 : let pgdata = Path::new(&compute.params.pgdata);
61 :
62 : // Connect to the communicator process's metrics socket
63 0 : let mut metrics_client = connect_communicator_socket(pgdata)
64 0 : .await
65 0 : .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
66 :
67 : // Make a request for /autoscaling_metrics
68 0 : let request = Request::builder()
69 0 : .method("GET")
70 0 : .uri("/autoscaling_metrics")
71 0 : .header("Host", "localhost") // hyper requires Host, even though the server won't care
72 0 : .body(Body::from(""))
73 0 : .unwrap();
74 0 : let resp = metrics_client
75 0 : .send_request(request)
76 0 : .await
77 0 : .context("fetching metrics from Postgres metrics service")
78 0 : .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
79 :
80 : // Build a response that just forwards the response we got.
81 0 : let mut response = Response::builder();
82 0 : response = response.status(resp.status());
83 0 : if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
84 0 : response = response.header(CONTENT_TYPE, content_type);
85 0 : }
86 0 : let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
87 0 : Ok(response.body(body).unwrap())
88 0 : }
|