Line data Source code
1 : use std::path::Path;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use axum::body::Body;
6 : use axum::extract::State;
7 : use axum::response::Response;
8 : use http::header::CONTENT_TYPE;
9 : use http_body_util::BodyExt;
10 : use hyper::{Request, StatusCode};
11 : use metrics::proto::MetricFamily;
12 : use metrics::{Encoder, TextEncoder};
13 :
14 : use crate::communicator_socket_client::connect_communicator_socket;
15 : use crate::compute::ComputeNode;
16 : use crate::http::JsonResponse;
17 : use crate::metrics::collect;
18 :
19 : /// Expose Prometheus metrics.
20 0 : pub(in crate::http) async fn get_metrics() -> Response {
21 : // When we call TextEncoder::encode() below, it will immediately return an
22 : // error if a metric family has no metrics, so we need to preemptively
23 : // filter out metric families with no metrics.
24 0 : let metrics = collect()
25 0 : .into_iter()
26 0 : .filter(|m| !m.get_metric().is_empty())
27 0 : .collect::<Vec<MetricFamily>>();
28 :
29 0 : let encoder = TextEncoder::new();
30 0 : let mut buffer = vec![];
31 :
32 0 : if let Err(e) = encoder.encode(&metrics, &mut buffer) {
33 0 : return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
34 0 : }
35 :
36 0 : Response::builder()
37 0 : .status(StatusCode::OK)
38 0 : .header(CONTENT_TYPE, encoder.format_type())
39 0 : .body(Body::from(buffer))
40 0 : .unwrap()
41 0 : }
42 :
43 : /// Fetch and forward metrics from the Postgres neon extension's metrics
44 : /// exporter that are used by autoscaling-agent.
45 : ///
46 : /// The neon extension exposes these metrics over a Unix domain socket
47 : /// in the data directory. That's not accessible directly from the outside
48 : /// world, so we have this endpoint in compute_ctl to expose it
49 0 : pub(in crate::http) async fn get_autoscaling_metrics(
50 0 : State(compute): State<Arc<ComputeNode>>,
51 0 : ) -> Result<Response, Response> {
52 0 : let pgdata = Path::new(&compute.params.pgdata);
53 :
54 : // Connect to the communicator process's metrics socket
55 0 : let mut metrics_client = connect_communicator_socket(pgdata)
56 0 : .await
57 0 : .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
58 :
59 : // Make a request for /autoscaling_metrics
60 0 : let request = Request::builder()
61 0 : .method("GET")
62 0 : .uri("/autoscaling_metrics")
63 0 : .header("Host", "localhost") // hyper requires Host, even though the server won't care
64 0 : .body(Body::from(""))
65 0 : .unwrap();
66 0 : let resp = metrics_client
67 0 : .send_request(request)
68 0 : .await
69 0 : .context("fetching metrics from Postgres metrics service")
70 0 : .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
71 :
72 : // Build a response that just forwards the response we got.
73 0 : let mut response = Response::builder();
74 0 : response = response.status(resp.status());
75 0 : if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
76 0 : response = response.header(CONTENT_TYPE, content_type);
77 0 : }
78 0 : let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
79 0 : Ok(response.body(body).unwrap())
80 0 : }
|