LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - lib.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 0.0 % 58 0
Test Date: 2025-03-12 18:28:53 Functions: 0.0 % 15 0

            Line data    Source code
       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : #![cfg(target_os = "linux")]
       4              : 
       5              : use std::fmt::Debug;
       6              : use std::net::SocketAddr;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::Context;
      10              : use axum::Router;
      11              : use axum::extract::ws::WebSocket;
      12              : use axum::extract::{State, WebSocketUpgrade};
      13              : use axum::response::Response;
      14              : use axum::routing::get;
      15              : use clap::Parser;
      16              : use futures::Future;
      17              : use runner::Runner;
      18              : use sysinfo::{RefreshKind, System, SystemExt};
      19              : use tokio::net::TcpListener;
      20              : use tokio::sync::broadcast;
      21              : use tokio::task::JoinHandle;
      22              : use tokio_util::sync::CancellationToken;
      23              : use tracing::{error, info};
      24              : 
      25              : // Code that interfaces with agent
      26              : pub mod dispatcher;
      27              : pub mod protocol;
      28              : 
      29              : pub mod cgroup;
      30              : pub mod filecache;
      31              : pub mod runner;
      32              : 
      33              : /// The vm-monitor is an autoscaling component started by compute_ctl.
      34              : ///
      35              : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      36              : /// memory pressure by making requests to the autoscaler-agent.
      37              : #[derive(Debug, Parser)]
      38              : pub struct Args {
      39              :     /// The name of the cgroup we should monitor for memory.high events. This
      40              :     /// is the cgroup that postgres should be running in.
      41              :     #[arg(short, long)]
      42              :     pub cgroup: Option<String>,
      43              : 
      44              :     /// The connection string for the Postgres file cache we should manage.
      45              :     #[arg(short, long)]
      46              :     pub pgconnstr: Option<String>,
      47              : 
      48              :     /// The address we should listen on for connection requests. For the
      49              :     /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
      50              :     #[arg(short, long)]
      51            0 :     pub addr: String,
      52              : }
      53              : 
      54              : impl Args {
      55            0 :     pub fn addr(&self) -> &str {
      56            0 :         &self.addr
      57            0 :     }
      58              : }
      59              : 
      60              : /// The number of bytes in one mebibyte.
      61              : #[allow(non_upper_case_globals)]
      62              : const MiB: u64 = 1 << 20;
      63              : 
      64              : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      65              : /// purposes. (Most calculations in this crate use bytes directly)
      66            0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      67            0 :     (bytes as f32) / (MiB as f32)
      68            0 : }
      69              : 
      70            0 : pub fn get_total_system_memory() -> u64 {
      71            0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      72            0 : }
      73              : 
      74              : /// Global app state for the Axum server
      75              : #[derive(Debug, Clone)]
      76              : pub struct ServerState {
      77              :     /// Used to close old connections.
      78              :     ///
      79              :     /// When a new connection is made, we send a message signalling to the old
      80              :     /// connection to close.
      81              :     pub sender: broadcast::Sender<()>,
      82              : 
      83              :     /// Used to cancel all spawned threads in the monitor.
      84              :     pub token: CancellationToken,
      85              : 
      86              :     // The CLI args
      87              :     pub args: &'static Args,
      88              : }
      89              : 
      90              : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      91              : ///
      92              : /// This is mainly meant to be called with futures that will be pending for a very
      93              : /// long time, or are not mean to return. If it is not desirable for the future to
      94              : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      95              : /// be logged with `f`.
      96            0 : pub fn spawn_with_cancel<T, F>(
      97            0 :     token: CancellationToken,
      98            0 :     f: F,
      99            0 :     future: T,
     100            0 : ) -> JoinHandle<Option<T::Output>>
     101            0 : where
     102            0 :     T: Future + Send + 'static,
     103            0 :     T::Output: Send + 'static,
     104            0 :     F: FnOnce(&T::Output) + Send + 'static,
     105            0 : {
     106            0 :     tokio::spawn(async move {
     107            0 :         tokio::select! {
     108            0 :             _ = token.cancelled() => {
     109            0 :                 info!("received global kill signal");
     110            0 :                 None
     111              :             }
     112            0 :             res = future => {
     113            0 :                 f(&res);
     114            0 :                 Some(res)
     115              :             }
     116              :         }
     117            0 :     })
     118            0 : }
     119              : 
     120              : /// The entrypoint to the binary.
     121              : ///
     122              : /// Set up tracing, parse arguments, and start an http server.
     123            0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     124            0 :     // This channel is used to close old connections. When a new connection is
     125            0 :     // made, we send a message signalling to the old connection to close.
     126            0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     127            0 : 
     128            0 :     let app = Router::new()
     129            0 :         // This route gets upgraded to a websocket connection. We only support
     130            0 :         // one connection at a time, which we enforce by killing old connections
     131            0 :         // when we receive a new one.
     132            0 :         .route("/monitor", get(ws_handler))
     133            0 :         .with_state(ServerState {
     134            0 :             sender,
     135            0 :             token,
     136            0 :             args,
     137            0 :         });
     138            0 : 
     139            0 :     let addr_str = args.addr();
     140            0 :     let addr: SocketAddr = addr_str.parse().expect("parsing address should not fail");
     141              : 
     142            0 :     let listener = TcpListener::bind(&addr)
     143            0 :         .await
     144            0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     145            0 :     info!(addr_str, "server bound");
     146            0 :     axum::serve(listener, app.into_make_service())
     147            0 :         .await
     148            0 :         .context("server exited")?;
     149              : 
     150            0 :     Ok(())
     151            0 : }
     152              : 
     153              : /// Handles incoming websocket connections.
     154              : ///
     155              : /// If we are already to connected to an agent, we kill that old connection
     156              : /// and accept the new one.
     157              : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     158              : pub async fn ws_handler(
     159              :     ws: WebSocketUpgrade,
     160              :     State(ServerState {
     161              :         sender,
     162              :         token,
     163              :         args,
     164              :     }): State<ServerState>,
     165              : ) -> Response {
     166              :     // Kill the old monitor
     167              :     info!("closing old connection if there is one");
     168              :     let _ = sender.send(());
     169              : 
     170              :     // Start the new one. Wow, the cycle of death and rebirth
     171              :     let closer = sender.subscribe();
     172            0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     173              : }
     174              : 
     175              : /// Starts the monitor. If startup fails or the monitor exits, an error will
     176              : /// be logged and our internal state will be reset to allow for new connections.
     177              : #[tracing::instrument(skip_all)]
     178              : async fn start_monitor(
     179              :     ws: WebSocket,
     180              :     args: &Args,
     181              :     kill: broadcast::Receiver<()>,
     182              :     token: CancellationToken,
     183              : ) {
     184              :     info!(
     185              :         ?args,
     186              :         "accepted new websocket connection -> starting monitor"
     187              :     );
     188              :     let timeout = Duration::from_secs(4);
     189              :     let monitor = tokio::time::timeout(
     190              :         timeout,
     191              :         Runner::new(Default::default(), args, ws, kill, token),
     192              :     )
     193              :     .await;
     194              :     let mut monitor = match monitor {
     195              :         Ok(Ok(monitor)) => monitor,
     196              :         Ok(Err(e)) => {
     197              :             error!(error = format_args!("{e:#}"), "failed to create monitor");
     198              :             return;
     199              :         }
     200              :         Err(_) => {
     201              :             error!(?timeout, "creating monitor timed out");
     202              :             return;
     203              :         }
     204              :     };
     205              :     info!("connected to agent");
     206              : 
     207              :     match monitor.run().await {
     208              :         Ok(()) => info!("monitor was killed due to new connection"),
     209              :         Err(e) => error!(
     210              :             error = format_args!("{e:#}"),
     211              :             "monitor terminated unexpectedly"
     212              :         ),
     213              :     }
     214              : }
        

Generated by: LCOV version 2.1-beta