Line data Source code
1 : //! Communicator control socket.
2 : //!
3 : //! Currently, the control socket is used to provide information about the communicator
4 : //! process, file cache etc. as prometheus metrics. In the future, it can be used to
5 : //! expose more things.
6 : //!
7 : //! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
8 : //! data directory. For debugging, you can access it with curl:
9 : //!
10 : //! ```sh
11 : //! curl --unix-socket neon-communicator.socket http://localhost/metrics
12 : //! ```
13 : //!
14 : use axum::Router;
15 : use axum::body::Body;
16 : use axum::extract::State;
17 : use axum::response::Response;
18 : use http::StatusCode;
19 : use http::header::CONTENT_TYPE;
20 :
21 : use measured::MetricGroup;
22 : use measured::text::BufferedTextEncoder;
23 :
24 : use std::io::ErrorKind;
25 :
26 : use tokio::net::UnixListener;
27 :
28 : use crate::NEON_COMMUNICATOR_SOCKET_NAME;
29 : use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
30 :
31 : impl CommunicatorWorkerProcessStruct {
32 : /// Launch the listener
33 0 : pub(crate) async fn launch_control_socket_listener(
34 0 : &'static self,
35 0 : ) -> Result<(), std::io::Error> {
36 : use axum::routing::get;
37 0 : let app = Router::new()
38 0 : .route("/metrics", get(get_metrics))
39 0 : .route("/autoscaling_metrics", get(get_autoscaling_metrics))
40 0 : .route("/debug/panic", get(handle_debug_panic))
41 0 : .with_state(self);
42 :
43 : // If the server is restarted, there might be an old socket still
44 : // lying around. Remove it first.
45 0 : match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
46 : Ok(()) => {
47 0 : tracing::warn!("removed stale control socket");
48 : }
49 0 : Err(e) if e.kind() == ErrorKind::NotFound => {}
50 0 : Err(e) => {
51 0 : tracing::error!("could not remove stale control socket: {e:#}");
52 : // Try to proceed anyway. It will likely fail below though.
53 : }
54 : };
55 :
56 : // Create the unix domain socket and start listening on it
57 0 : let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
58 :
59 0 : tokio::spawn(async {
60 0 : tracing::info!("control socket listener spawned");
61 0 : axum::serve(listener, app)
62 0 : .await
63 0 : .expect("axum::serve never returns")
64 0 : });
65 :
66 0 : Ok(())
67 0 : }
68 : }
69 :
70 : /// Expose all Prometheus metrics.
71 0 : async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
72 0 : tracing::trace!("/metrics requested");
73 0 : metrics_to_response(&state).await
74 0 : }
75 :
76 : /// Expose Prometheus metrics, for use by the autoscaling agent.
77 : ///
78 : /// This is a subset of all the metrics.
79 0 : async fn get_autoscaling_metrics(
80 0 : State(state): State<&CommunicatorWorkerProcessStruct>,
81 0 : ) -> Response {
82 0 : tracing::trace!("/metrics requested");
83 0 : metrics_to_response(&state.lfc_metrics).await
84 0 : }
85 :
86 0 : async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
87 0 : panic!("test HTTP handler task panic");
88 : }
89 :
90 : /// Helper function to convert prometheus metrics to a text response
91 0 : async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
92 0 : let mut enc = BufferedTextEncoder::new();
93 0 : metrics
94 0 : .collect_group_into(&mut enc)
95 0 : .unwrap_or_else(|never| match never {});
96 :
97 0 : Response::builder()
98 0 : .status(StatusCode::OK)
99 0 : .header(CONTENT_TYPE, "application/text")
100 0 : .body(Body::from(enc.finish()))
101 0 : .unwrap()
102 0 : }
|