LCOV - code coverage report
Current view: top level - pgxn/neon/communicator/src/worker_process - control_socket.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 45 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 11 0

            Line data    Source code
       1              : //! Communicator control socket.
       2              : //!
       3              : //! Currently, the control socket is used to provide information about the communicator
       4              : //! process, file cache etc. as prometheus metrics. In the future, it can be used to
       5              : //! expose more things.
       6              : //!
       7              : //! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
       8              : //! data directory. For debugging, you can access it with curl:
       9              : //!
      10              : //! ```sh
      11              : //! curl --unix-socket neon-communicator.socket http://localhost/metrics
      12              : //! ```
      13              : //!
      14              : use axum::Router;
      15              : use axum::body::Body;
      16              : use axum::extract::State;
      17              : use axum::response::Response;
      18              : use http::StatusCode;
      19              : use http::header::CONTENT_TYPE;
      20              : 
      21              : use measured::MetricGroup;
      22              : use measured::text::BufferedTextEncoder;
      23              : 
      24              : use std::io::ErrorKind;
      25              : 
      26              : use tokio::net::UnixListener;
      27              : 
      28              : use crate::NEON_COMMUNICATOR_SOCKET_NAME;
      29              : use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
      30              : 
      31              : impl CommunicatorWorkerProcessStruct {
      32              :     /// Launch the listener
      33            0 :     pub(crate) async fn launch_control_socket_listener(
      34            0 :         &'static self,
      35            0 :     ) -> Result<(), std::io::Error> {
      36              :         use axum::routing::get;
      37            0 :         let app = Router::new()
      38            0 :             .route("/metrics", get(get_metrics))
      39            0 :             .route("/autoscaling_metrics", get(get_autoscaling_metrics))
      40            0 :             .route("/debug/panic", get(handle_debug_panic))
      41            0 :             .with_state(self);
      42              : 
      43              :         // If the server is restarted, there might be an old socket still
      44              :         // lying around. Remove it first.
      45            0 :         match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
      46              :             Ok(()) => {
      47            0 :                 tracing::warn!("removed stale control socket");
      48              :             }
      49            0 :             Err(e) if e.kind() == ErrorKind::NotFound => {}
      50            0 :             Err(e) => {
      51            0 :                 tracing::error!("could not remove stale control socket: {e:#}");
      52              :                 // Try to proceed anyway. It will likely fail below though.
      53              :             }
      54              :         };
      55              : 
      56              :         // Create the unix domain socket and start listening on it
      57            0 :         let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
      58              : 
      59            0 :         tokio::spawn(async {
      60            0 :             tracing::info!("control socket listener spawned");
      61            0 :             axum::serve(listener, app)
      62            0 :                 .await
      63            0 :                 .expect("axum::serve never returns")
      64            0 :         });
      65              : 
      66            0 :         Ok(())
      67            0 :     }
      68              : }
      69              : 
      70              : /// Expose all Prometheus metrics.
      71            0 : async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
      72            0 :     tracing::trace!("/metrics requested");
      73            0 :     metrics_to_response(&state).await
      74            0 : }
      75              : 
      76              : /// Expose Prometheus metrics, for use by the autoscaling agent.
      77              : ///
      78              : /// This is a subset of all the metrics.
      79            0 : async fn get_autoscaling_metrics(
      80            0 :     State(state): State<&CommunicatorWorkerProcessStruct>,
      81            0 : ) -> Response {
      82            0 :     tracing::trace!("/metrics requested");
      83            0 :     metrics_to_response(&state.lfc_metrics).await
      84            0 : }
      85              : 
      86            0 : async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
      87            0 :     panic!("test HTTP handler task panic");
      88              : }
      89              : 
      90              : /// Helper function to convert prometheus metrics to a text response
      91            0 : async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
      92            0 :     let mut enc = BufferedTextEncoder::new();
      93            0 :     metrics
      94            0 :         .collect_group_into(&mut enc)
      95            0 :         .unwrap_or_else(|never| match never {});
      96              : 
      97            0 :     Response::builder()
      98            0 :         .status(StatusCode::OK)
      99            0 :         .header(CONTENT_TYPE, "application/text")
     100            0 :         .body(Body::from(enc.finish()))
     101            0 :         .unwrap()
     102            0 : }
        

Generated by: LCOV version 2.1-beta