LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - lib.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 73 0 73
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 49 0 49
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : #![cfg(target_os = "linux")]
       2                 : 
       3                 : use anyhow::Context;
       4                 : use axum::{
       5                 :     extract::{ws::WebSocket, State, WebSocketUpgrade},
       6                 :     response::Response,
       7                 : };
       8                 : use axum::{routing::get, Router, Server};
       9                 : use clap::Parser;
      10                 : use futures::Future;
      11                 : use std::{fmt::Debug, time::Duration};
      12                 : use sysinfo::{RefreshKind, System, SystemExt};
      13                 : use tokio::{sync::broadcast, task::JoinHandle};
      14                 : use tokio_util::sync::CancellationToken;
      15                 : use tracing::{error, info};
      16                 : 
      17                 : use runner::Runner;
      18                 : 
      19                 : // Code that interfaces with agent
      20                 : pub mod dispatcher;
      21                 : pub mod protocol;
      22                 : 
      23                 : pub mod cgroup;
      24                 : pub mod filecache;
      25                 : pub mod runner;
      26                 : 
      27                 : /// The vm-monitor is an autoscaling component started by compute_ctl.
      28                 : ///
      29                 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      30                 : /// memory pressure by making requests to the autoscaler-agent.
      31 UBC           0 : #[derive(Debug, Parser)]
      32                 : pub struct Args {
      33                 :     /// The name of the cgroup we should monitor for memory.high events. This
      34                 :     /// is the cgroup that postgres should be running in.
      35                 :     #[arg(short, long)]
      36                 :     pub cgroup: Option<String>,
      37                 : 
      38                 :     /// The connection string for the Postgres file cache we should manage.
      39                 :     #[arg(short, long)]
      40                 :     pub pgconnstr: Option<String>,
      41                 : 
      42                 :     /// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
      43                 :     /// kernel's page cache), and therefore should not count against available memory.
      44                 :     //
      45                 :     // NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
      46                 :     // than a roundabout way, via whether it's on disk), but in order to be backwards compatible
      47                 :     // during the switch away from an in-memory file cache, we had to default to the previous
      48                 :     // behavior.
      49                 :     #[arg(long)]
      50               0 :     pub file_cache_on_disk: bool,
      51               0 : 
      52               0 :     /// The address we should listen on for connection requests. For the
      53                 :     /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
      54                 :     #[arg(short, long)]
      55               0 :     pub addr: String,
      56                 : }
      57                 : 
      58                 : impl Args {
      59               0 :     pub fn addr(&self) -> &str {
      60               0 :         &self.addr
      61               0 :     }
      62                 : }
      63                 : 
      64                 : /// The number of bytes in one mebibyte.
      65                 : #[allow(non_upper_case_globals)]
      66                 : const MiB: u64 = 1 << 20;
      67                 : 
      68                 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      69                 : /// purposes. (Most calculations in this crate use bytes directly)
      70               0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      71               0 :     (bytes as f32) / (MiB as f32)
      72               0 : }
      73                 : 
      74               0 : pub fn get_total_system_memory() -> u64 {
      75               0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      76               0 : }
      77                 : 
      78                 : /// Global app state for the Axum server
      79               0 : #[derive(Debug, Clone)]
      80                 : pub struct ServerState {
      81                 :     /// Used to close old connections.
      82                 :     ///
      83                 :     /// When a new connection is made, we send a message signalling to the old
      84                 :     /// connection to close.
      85                 :     pub sender: broadcast::Sender<()>,
      86                 : 
      87                 :     /// Used to cancel all spawned threads in the monitor.
      88                 :     pub token: CancellationToken,
      89                 : 
      90                 :     // The CLI args
      91                 :     pub args: &'static Args,
      92                 : }
      93                 : 
      94                 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      95                 : ///
      96                 : /// This is mainly meant to be called with futures that will be pending for a very
      97                 : /// long time, or are not mean to return. If it is not desirable for the future to
      98                 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      99                 : /// be logged with `f`.
     100               0 : pub fn spawn_with_cancel<T, F>(
     101               0 :     token: CancellationToken,
     102               0 :     f: F,
     103               0 :     future: T,
     104               0 : ) -> JoinHandle<Option<T::Output>>
     105               0 : where
     106               0 :     T: Future + Send + 'static,
     107               0 :     T::Output: Send + 'static,
     108               0 :     F: FnOnce(&T::Output) + Send + 'static,
     109               0 : {
     110               0 :     tokio::spawn(async move {
     111               0 :         tokio::select! {
     112                 :             _ = token.cancelled() => {
     113               0 :                 info!("received global kill signal");
     114                 :                 None
     115                 :             }
     116               0 :             res = future => {
     117                 :                 f(&res);
     118                 :                 Some(res)
     119                 :             }
     120                 :         }
     121               0 :     })
     122               0 : }
     123                 : 
     124                 : /// The entrypoint to the binary.
     125                 : ///
     126                 : /// Set up tracing, parse arguments, and start an http server.
     127               0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     128               0 :     // This channel is used to close old connections. When a new connection is
     129               0 :     // made, we send a message signalling to the old connection to close.
     130               0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     131               0 : 
     132               0 :     let app = Router::new()
     133               0 :         // This route gets upgraded to a websocket connection. We only support
     134               0 :         // one connection at a time, which we enforce by killing old connections
     135               0 :         // when we receive a new one.
     136               0 :         .route("/monitor", get(ws_handler))
     137               0 :         .with_state(ServerState {
     138               0 :             sender,
     139               0 :             token,
     140               0 :             args,
     141               0 :         });
     142               0 : 
     143               0 :     let addr = args.addr();
     144               0 :     let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
     145               0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     146                 : 
     147               0 :     info!(addr, "server bound");
     148                 : 
     149               0 :     bound
     150               0 :         .serve(app.into_make_service())
     151               0 :         .await
     152               0 :         .context("server exited")?;
     153                 : 
     154               0 :     Ok(())
     155               0 : }
     156                 : 
     157                 : /// Handles incoming websocket connections.
     158                 : ///
     159                 : /// If we are already to connected to an agent, we kill that old connection
     160                 : /// and accept the new one.
     161               0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     162                 : pub async fn ws_handler(
     163                 :     ws: WebSocketUpgrade,
     164                 :     State(ServerState {
     165                 :         sender,
     166                 :         token,
     167                 :         args,
     168                 :     }): State<ServerState>,
     169                 : ) -> Response {
     170                 :     // Kill the old monitor
     171               0 :     info!("closing old connection if there is one");
     172                 :     let _ = sender.send(());
     173                 : 
     174                 :     // Start the new one. Wow, the cycle of death and rebirth
     175                 :     let closer = sender.subscribe();
     176               0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     177                 : }
     178                 : 
     179                 : /// Starts the monitor. If startup fails or the monitor exits, an error will
     180                 : /// be logged and our internal state will be reset to allow for new connections.
     181               0 : #[tracing::instrument(skip_all)]
     182                 : async fn start_monitor(
     183                 :     ws: WebSocket,
     184                 :     args: &Args,
     185                 :     kill: broadcast::Receiver<()>,
     186                 :     token: CancellationToken,
     187                 : ) {
     188               0 :     info!(
     189               0 :         ?args,
     190               0 :         "accepted new websocket connection -> starting monitor"
     191               0 :     );
     192                 :     let timeout = Duration::from_secs(4);
     193                 :     let monitor = tokio::time::timeout(
     194                 :         timeout,
     195                 :         Runner::new(Default::default(), args, ws, kill, token),
     196                 :     )
     197                 :     .await;
     198                 :     let mut monitor = match monitor {
     199                 :         Ok(Ok(monitor)) => monitor,
     200                 :         Ok(Err(error)) => {
     201               0 :             error!(?error, "failed to create monitor");
     202                 :             return;
     203                 :         }
     204                 :         Err(_) => {
     205               0 :             error!(
     206               0 :                 ?timeout,
     207               0 :                 "creating monitor timed out (probably waiting to receive protocol range)"
     208               0 :             );
     209                 :             return;
     210                 :         }
     211                 :     };
     212               0 :     info!("connected to agent");
     213                 : 
     214                 :     match monitor.run().await {
     215               0 :         Ok(()) => info!("monitor was killed due to new connection"),
     216               0 :         Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
     217                 :     }
     218                 : }
        

Generated by: LCOV version 2.1-beta