LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ff8efadb0253cf618c612650348666c0c564111.info Lines: 0.0 % 61 0
Test Date: 2024-11-20 17:53:50 Functions: 0.0 % 27 0

            Line data    Source code
       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : #![cfg(target_os = "linux")]
       4              : 
       5              : use anyhow::Context;
       6              : use axum::{
       7              :     extract::{ws::WebSocket, State, WebSocketUpgrade},
       8              :     response::Response,
       9              : };
      10              : use axum::{routing::get, Router};
      11              : use clap::Parser;
      12              : use futures::Future;
      13              : use std::net::SocketAddr;
      14              : use std::{fmt::Debug, time::Duration};
      15              : use sysinfo::{RefreshKind, System, SystemExt};
      16              : use tokio::net::TcpListener;
      17              : use tokio::{sync::broadcast, task::JoinHandle};
      18              : use tokio_util::sync::CancellationToken;
      19              : use tracing::{error, info};
      20              : 
      21              : use runner::Runner;
      22              : 
      23              : // Code that interfaces with agent
      24              : pub mod dispatcher;
      25              : pub mod protocol;
      26              : 
      27              : pub mod cgroup;
      28              : pub mod filecache;
      29              : pub mod runner;
      30              : 
      31              : /// The vm-monitor is an autoscaling component started by compute_ctl.
      32              : ///
      33              : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      34              : /// memory pressure by making requests to the autoscaler-agent.
      35            0 : #[derive(Debug, Parser)]
      36              : pub struct Args {
      37              :     /// The name of the cgroup we should monitor for memory.high events. This
      38              :     /// is the cgroup that postgres should be running in.
      39              :     #[arg(short, long)]
      40              :     pub cgroup: Option<String>,
      41              : 
      42              :     /// The connection string for the Postgres file cache we should manage.
      43              :     #[arg(short, long)]
      44              :     pub pgconnstr: Option<String>,
      45              : 
      46              :     /// The address we should listen on for connection requests. For the
      47              :     /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
      48              :     #[arg(short, long)]
      49            0 :     pub addr: String,
      50              : }
      51              : 
      52              : impl Args {
      53            0 :     pub fn addr(&self) -> &str {
      54            0 :         &self.addr
      55            0 :     }
      56              : }
      57              : 
      58              : /// The number of bytes in one mebibyte.
      59              : #[allow(non_upper_case_globals)]
      60              : const MiB: u64 = 1 << 20;
      61              : 
      62              : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      63              : /// purposes. (Most calculations in this crate use bytes directly)
      64            0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      65            0 :     (bytes as f32) / (MiB as f32)
      66            0 : }
      67              : 
      68            0 : pub fn get_total_system_memory() -> u64 {
      69            0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      70            0 : }
      71              : 
      72              : /// Global app state for the Axum server
      73              : #[derive(Debug, Clone)]
      74              : pub struct ServerState {
      75              :     /// Used to close old connections.
      76              :     ///
      77              :     /// When a new connection is made, we send a message signalling to the old
      78              :     /// connection to close.
      79              :     pub sender: broadcast::Sender<()>,
      80              : 
      81              :     /// Used to cancel all spawned threads in the monitor.
      82              :     pub token: CancellationToken,
      83              : 
      84              :     // The CLI args
      85              :     pub args: &'static Args,
      86              : }
      87              : 
      88              : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      89              : ///
      90              : /// This is mainly meant to be called with futures that will be pending for a very
      91              : /// long time, or are not mean to return. If it is not desirable for the future to
      92              : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      93              : /// be logged with `f`.
      94            0 : pub fn spawn_with_cancel<T, F>(
      95            0 :     token: CancellationToken,
      96            0 :     f: F,
      97            0 :     future: T,
      98            0 : ) -> JoinHandle<Option<T::Output>>
      99            0 : where
     100            0 :     T: Future + Send + 'static,
     101            0 :     T::Output: Send + 'static,
     102            0 :     F: FnOnce(&T::Output) + Send + 'static,
     103            0 : {
     104            0 :     tokio::spawn(async move {
     105            0 :         tokio::select! {
     106            0 :             _ = token.cancelled() => {
     107            0 :                 info!("received global kill signal");
     108            0 :                 None
     109              :             }
     110            0 :             res = future => {
     111            0 :                 f(&res);
     112            0 :                 Some(res)
     113              :             }
     114              :         }
     115            0 :     })
     116            0 : }
     117              : 
     118              : /// The entrypoint to the binary.
     119              : ///
     120              : /// Set up tracing, parse arguments, and start an http server.
     121            0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     122            0 :     // This channel is used to close old connections. When a new connection is
     123            0 :     // made, we send a message signalling to the old connection to close.
     124            0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     125            0 : 
     126            0 :     let app = Router::new()
     127            0 :         // This route gets upgraded to a websocket connection. We only support
     128            0 :         // one connection at a time, which we enforce by killing old connections
     129            0 :         // when we receive a new one.
     130            0 :         .route("/monitor", get(ws_handler))
     131            0 :         .with_state(ServerState {
     132            0 :             sender,
     133            0 :             token,
     134            0 :             args,
     135            0 :         });
     136            0 : 
     137            0 :     let addr_str = args.addr();
     138            0 :     let addr: SocketAddr = addr_str.parse().expect("parsing address should not fail");
     139              : 
     140            0 :     let listener = TcpListener::bind(&addr)
     141            0 :         .await
     142            0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     143            0 :     info!(addr_str, "server bound");
     144            0 :     axum::serve(listener, app.into_make_service())
     145            0 :         .await
     146            0 :         .context("server exited")?;
     147              : 
     148            0 :     Ok(())
     149            0 : }
     150              : 
     151              : /// Handles incoming websocket connections.
     152              : ///
     153              : /// If we are already to connected to an agent, we kill that old connection
     154              : /// and accept the new one.
     155            0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     156              : pub async fn ws_handler(
     157              :     ws: WebSocketUpgrade,
     158              :     State(ServerState {
     159              :         sender,
     160              :         token,
     161              :         args,
     162              :     }): State<ServerState>,
     163              : ) -> Response {
     164              :     // Kill the old monitor
     165              :     info!("closing old connection if there is one");
     166              :     let _ = sender.send(());
     167              : 
     168              :     // Start the new one. Wow, the cycle of death and rebirth
     169              :     let closer = sender.subscribe();
     170            0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     171              : }
     172              : 
     173              : /// Starts the monitor. If startup fails or the monitor exits, an error will
     174              : /// be logged and our internal state will be reset to allow for new connections.
     175            0 : #[tracing::instrument(skip_all)]
     176              : async fn start_monitor(
     177              :     ws: WebSocket,
     178              :     args: &Args,
     179              :     kill: broadcast::Receiver<()>,
     180              :     token: CancellationToken,
     181              : ) {
     182              :     info!(
     183              :         ?args,
     184              :         "accepted new websocket connection -> starting monitor"
     185              :     );
     186              :     let timeout = Duration::from_secs(4);
     187              :     let monitor = tokio::time::timeout(
     188              :         timeout,
     189              :         Runner::new(Default::default(), args, ws, kill, token),
     190              :     )
     191              :     .await;
     192              :     let mut monitor = match monitor {
     193              :         Ok(Ok(monitor)) => monitor,
     194              :         Ok(Err(error)) => {
     195              :             error!(?error, "failed to create monitor");
     196              :             return;
     197              :         }
     198              :         Err(_) => {
     199              :             error!(
     200              :                 ?timeout,
     201              :                 "creating monitor timed out (probably waiting to receive protocol range)"
     202              :             );
     203              :             return;
     204              :         }
     205              :     };
     206              :     info!("connected to agent");
     207              : 
     208              :     match monitor.run().await {
     209              :         Ok(()) => info!("monitor was killed due to new connection"),
     210              :         Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
     211              :     }
     212              : }
        

Generated by: LCOV version 2.1-beta