LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - lib.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 70 0 70
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 44 0 44
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : #![deny(unsafe_code)]
       2                 : #![deny(clippy::undocumented_unsafe_blocks)]
       3                 : #![cfg(target_os = "linux")]
       4                 : 
       5                 : use anyhow::Context;
       6                 : use axum::{
       7                 :     extract::{ws::WebSocket, State, WebSocketUpgrade},
       8                 :     response::Response,
       9                 : };
      10                 : use axum::{routing::get, Router, Server};
      11                 : use clap::Parser;
      12                 : use futures::Future;
      13                 : use std::{fmt::Debug, time::Duration};
      14                 : use sysinfo::{RefreshKind, System, SystemExt};
      15                 : use tokio::{sync::broadcast, task::JoinHandle};
      16                 : use tokio_util::sync::CancellationToken;
      17                 : use tracing::{error, info};
      18                 : 
      19                 : use runner::Runner;
      20                 : 
      21                 : // Code that interfaces with agent
      22                 : pub mod dispatcher;
      23                 : pub mod protocol;
      24                 : 
      25                 : pub mod cgroup;
      26                 : pub mod filecache;
      27                 : pub mod runner;
      28                 : 
      29                 : /// The vm-monitor is an autoscaling component started by compute_ctl.
      30                 : ///
      31                 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      32                 : /// memory pressure by making requests to the autoscaler-agent.
      33 UBC           0 : #[derive(Debug, Parser)]
      34                 : pub struct Args {
      35                 :     /// The name of the cgroup we should monitor for memory.high events. This
      36                 :     /// is the cgroup that postgres should be running in.
      37                 :     #[arg(short, long)]
      38                 :     pub cgroup: Option<String>,
      39                 : 
      40                 :     /// The connection string for the Postgres file cache we should manage.
      41                 :     #[arg(short, long)]
      42                 :     pub pgconnstr: Option<String>,
      43                 : 
      44                 :     /// The address we should listen on for connection requests. For the
      45                 :     /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
      46                 :     #[arg(short, long)]
      47               0 :     pub addr: String,
      48                 : }
      49                 : 
      50                 : impl Args {
      51               0 :     pub fn addr(&self) -> &str {
      52               0 :         &self.addr
      53               0 :     }
      54                 : }
      55                 : 
      56                 : /// The number of bytes in one mebibyte.
      57                 : #[allow(non_upper_case_globals)]
      58                 : const MiB: u64 = 1 << 20;
      59                 : 
      60                 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      61                 : /// purposes. (Most calculations in this crate use bytes directly)
      62               0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      63               0 :     (bytes as f32) / (MiB as f32)
      64               0 : }
      65                 : 
      66               0 : pub fn get_total_system_memory() -> u64 {
      67               0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      68               0 : }
      69                 : 
      70                 : /// Global app state for the Axum server
      71               0 : #[derive(Debug, Clone)]
      72                 : pub struct ServerState {
      73                 :     /// Used to close old connections.
      74                 :     ///
      75                 :     /// When a new connection is made, we send a message signalling to the old
      76                 :     /// connection to close.
      77                 :     pub sender: broadcast::Sender<()>,
      78                 : 
      79                 :     /// Used to cancel all spawned threads in the monitor.
      80                 :     pub token: CancellationToken,
      81                 : 
      82                 :     // The CLI args
      83                 :     pub args: &'static Args,
      84                 : }
      85                 : 
      86                 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      87                 : ///
      88                 : /// This is mainly meant to be called with futures that will be pending for a very
      89                 : /// long time, or are not mean to return. If it is not desirable for the future to
      90                 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      91                 : /// be logged with `f`.
      92               0 : pub fn spawn_with_cancel<T, F>(
      93               0 :     token: CancellationToken,
      94               0 :     f: F,
      95               0 :     future: T,
      96               0 : ) -> JoinHandle<Option<T::Output>>
      97               0 : where
      98               0 :     T: Future + Send + 'static,
      99               0 :     T::Output: Send + 'static,
     100               0 :     F: FnOnce(&T::Output) + Send + 'static,
     101               0 : {
     102               0 :     tokio::spawn(async move {
     103               0 :         tokio::select! {
     104                 :             _ = token.cancelled() => {
     105               0 :                 info!("received global kill signal");
     106                 :                 None
     107                 :             }
     108               0 :             res = future => {
     109                 :                 f(&res);
     110                 :                 Some(res)
     111                 :             }
     112                 :         }
     113               0 :     })
     114               0 : }
     115                 : 
     116                 : /// The entrypoint to the binary.
     117                 : ///
     118                 : /// Set up tracing, parse arguments, and start an http server.
     119               0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     120               0 :     // This channel is used to close old connections. When a new connection is
     121               0 :     // made, we send a message signalling to the old connection to close.
     122               0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     123               0 : 
     124               0 :     let app = Router::new()
     125               0 :         // This route gets upgraded to a websocket connection. We only support
     126               0 :         // one connection at a time, which we enforce by killing old connections
     127               0 :         // when we receive a new one.
     128               0 :         .route("/monitor", get(ws_handler))
     129               0 :         .with_state(ServerState {
     130               0 :             sender,
     131               0 :             token,
     132               0 :             args,
     133               0 :         });
     134               0 : 
     135               0 :     let addr = args.addr();
     136               0 :     let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
     137               0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     138                 : 
     139               0 :     info!(addr, "server bound");
     140                 : 
     141               0 :     bound
     142               0 :         .serve(app.into_make_service())
     143               0 :         .await
     144               0 :         .context("server exited")?;
     145                 : 
     146               0 :     Ok(())
     147               0 : }
     148                 : 
     149                 : /// Handles incoming websocket connections.
     150                 : ///
     151                 : /// If we are already to connected to an agent, we kill that old connection
     152                 : /// and accept the new one.
     153               0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     154                 : pub async fn ws_handler(
     155                 :     ws: WebSocketUpgrade,
     156                 :     State(ServerState {
     157                 :         sender,
     158                 :         token,
     159                 :         args,
     160                 :     }): State<ServerState>,
     161                 : ) -> Response {
     162                 :     // Kill the old monitor
     163               0 :     info!("closing old connection if there is one");
     164                 :     let _ = sender.send(());
     165                 : 
     166                 :     // Start the new one. Wow, the cycle of death and rebirth
     167                 :     let closer = sender.subscribe();
     168               0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     169                 : }
     170                 : 
     171                 : /// Starts the monitor. If startup fails or the monitor exits, an error will
     172                 : /// be logged and our internal state will be reset to allow for new connections.
     173               0 : #[tracing::instrument(skip_all)]
     174                 : async fn start_monitor(
     175                 :     ws: WebSocket,
     176                 :     args: &Args,
     177                 :     kill: broadcast::Receiver<()>,
     178                 :     token: CancellationToken,
     179                 : ) {
     180               0 :     info!(
     181               0 :         ?args,
     182               0 :         "accepted new websocket connection -> starting monitor"
     183               0 :     );
     184                 :     let timeout = Duration::from_secs(4);
     185                 :     let monitor = tokio::time::timeout(
     186                 :         timeout,
     187                 :         Runner::new(Default::default(), args, ws, kill, token),
     188                 :     )
     189                 :     .await;
     190                 :     let mut monitor = match monitor {
     191                 :         Ok(Ok(monitor)) => monitor,
     192                 :         Ok(Err(error)) => {
     193               0 :             error!(?error, "failed to create monitor");
     194                 :             return;
     195                 :         }
     196                 :         Err(_) => {
     197               0 :             error!(
     198               0 :                 ?timeout,
     199               0 :                 "creating monitor timed out (probably waiting to receive protocol range)"
     200               0 :             );
     201                 :             return;
     202                 :         }
     203                 :     };
     204               0 :     info!("connected to agent");
     205                 : 
     206                 :     match monitor.run().await {
     207               0 :         Ok(()) => info!("monitor was killed due to new connection"),
     208               0 :         Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
     209                 :     }
     210                 : }
        

Generated by: LCOV version 2.1-beta