LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 70 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 49 0

            Line data    Source code
       1              : #![cfg(target_os = "linux")]
       2              : 
       3              : use anyhow::Context;
       4              : use axum::{
       5              :     extract::{ws::WebSocket, State, WebSocketUpgrade},
       6              :     response::Response,
       7              : };
       8              : use axum::{routing::get, Router, Server};
       9              : use clap::Parser;
      10              : use futures::Future;
      11              : use std::{fmt::Debug, time::Duration};
      12              : use sysinfo::{RefreshKind, System, SystemExt};
      13              : use tokio::{sync::broadcast, task::JoinHandle};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::{error, info};
      16              : 
      17              : use runner::Runner;
      18              : 
      19              : // Code that interfaces with agent
      20              : pub mod dispatcher;
      21              : pub mod protocol;
      22              : 
      23              : pub mod cgroup;
      24              : pub mod filecache;
      25              : pub mod runner;
      26              : 
      27              : /// The vm-monitor is an autoscaling component started by compute_ctl.
      28              : ///
      29              : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      30              : /// memory pressure by making requests to the autoscaler-agent.
      31            0 : #[derive(Debug, Parser)]
      32              : pub struct Args {
      33              :     /// The name of the cgroup we should monitor for memory.high events. This
      34              :     /// is the cgroup that postgres should be running in.
      35              :     #[arg(short, long)]
      36              :     pub cgroup: Option<String>,
      37              : 
      38              :     /// The connection string for the Postgres file cache we should manage.
      39              :     #[arg(short, long)]
      40              :     pub pgconnstr: Option<String>,
      41              : 
      42              :     /// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
      43              :     /// kernel's page cache), and therefore should not count against available memory.
      44              :     //
      45              :     // NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
      46              :     // than a roundabout way, via whether it's on disk), but in order to be backwards compatible
      47              :     // during the switch away from an in-memory file cache, we had to default to the previous
      48              :     // behavior.
      49              :     #[arg(long)]
      50            0 :     pub file_cache_on_disk: bool,
      51            0 : 
      52            0 :     /// The address we should listen on for connection requests. For the
      53              :     /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
      54              :     #[arg(short, long)]
      55            0 :     pub addr: String,
      56              : }
      57              : 
      58              : impl Args {
      59            0 :     pub fn addr(&self) -> &str {
      60            0 :         &self.addr
      61            0 :     }
      62              : }
      63              : 
      64              : /// The number of bytes in one mebibyte.
      65              : #[allow(non_upper_case_globals)]
      66              : const MiB: u64 = 1 << 20;
      67              : 
      68              : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      69              : /// purposes. (Most calculations in this crate use bytes directly)
      70            0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      71            0 :     (bytes as f32) / (MiB as f32)
      72            0 : }
      73              : 
      74            0 : pub fn get_total_system_memory() -> u64 {
      75            0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      76            0 : }
      77              : 
      78              : /// Global app state for the Axum server
      79            0 : #[derive(Debug, Clone)]
      80              : pub struct ServerState {
      81              :     /// Used to close old connections.
      82              :     ///
      83              :     /// When a new connection is made, we send a message signalling to the old
      84              :     /// connection to close.
      85              :     pub sender: broadcast::Sender<()>,
      86              : 
      87              :     /// Used to cancel all spawned threads in the monitor.
      88              :     pub token: CancellationToken,
      89              : 
      90              :     // The CLI args
      91              :     pub args: &'static Args,
      92              : }
      93              : 
      94              : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      95              : ///
      96              : /// This is mainly meant to be called with futures that will be pending for a very
      97              : /// long time, or are not mean to return. If it is not desirable for the future to
      98              : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      99              : /// be logged with `f`.
     100            0 : pub fn spawn_with_cancel<T, F>(
     101            0 :     token: CancellationToken,
     102            0 :     f: F,
     103            0 :     future: T,
     104            0 : ) -> JoinHandle<Option<T::Output>>
     105            0 : where
     106            0 :     T: Future + Send + 'static,
     107            0 :     T::Output: Send + 'static,
     108            0 :     F: FnOnce(&T::Output) + Send + 'static,
     109            0 : {
     110            0 :     tokio::spawn(async move {
     111            0 :         tokio::select! {
     112              :             _ = token.cancelled() => {
     113            0 :                 info!("received global kill signal");
     114              :                 None
     115              :             }
     116            0 :             res = future => {
     117              :                 f(&res);
     118              :                 Some(res)
     119              :             }
     120              :         }
     121            0 :     })
     122            0 : }
     123              : 
     124              : /// The entrypoint to the binary.
     125              : ///
     126              : /// Set up tracing, parse arguments, and start an http server.
     127            0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     128            0 :     // This channel is used to close old connections. When a new connection is
     129            0 :     // made, we send a message signalling to the old connection to close.
     130            0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     131            0 : 
     132            0 :     let app = Router::new()
     133            0 :         // This route gets upgraded to a websocket connection. We only support
     134            0 :         // one connection at a time, which we enforce by killing old connections
     135            0 :         // when we receive a new one.
     136            0 :         .route("/monitor", get(ws_handler))
     137            0 :         .with_state(ServerState {
     138            0 :             sender,
     139            0 :             token,
     140            0 :             args,
     141            0 :         });
     142            0 : 
     143            0 :     let addr = args.addr();
     144            0 :     let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
     145            0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     146              : 
     147            0 :     info!(addr, "server bound");
     148              : 
     149            0 :     bound
     150            0 :         .serve(app.into_make_service())
     151            0 :         .await
     152            0 :         .context("server exited")?;
     153              : 
     154            0 :     Ok(())
     155            0 : }
     156              : 
     157              : /// Handles incoming websocket connections.
     158              : ///
     159              : /// If we are already to connected to an agent, we kill that old connection
     160              : /// and accept the new one.
     161            0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     162              : pub async fn ws_handler(
     163              :     ws: WebSocketUpgrade,
     164              :     State(ServerState {
     165              :         sender,
     166              :         token,
     167              :         args,
     168              :     }): State<ServerState>,
     169              : ) -> Response {
     170              :     // Kill the old monitor
     171            0 :     info!("closing old connection if there is one");
     172              :     let _ = sender.send(());
     173              : 
     174              :     // Start the new one. Wow, the cycle of death and rebirth
     175              :     let closer = sender.subscribe();
     176            0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     177              : }
     178              : 
     179              : /// Starts the monitor. If startup fails or the monitor exits, an error will
     180              : /// be logged and our internal state will be reset to allow for new connections.
     181            0 : #[tracing::instrument(skip_all, fields(?args))]
     182              : async fn start_monitor(
     183              :     ws: WebSocket,
     184              :     args: &Args,
     185              :     kill: broadcast::Receiver<()>,
     186              :     token: CancellationToken,
     187              : ) {
     188            0 :     info!("accepted new websocket connection -> starting monitor");
     189              :     let timeout = Duration::from_secs(4);
     190              :     let monitor = tokio::time::timeout(
     191              :         timeout,
     192              :         Runner::new(Default::default(), args, ws, kill, token),
     193              :     )
     194              :     .await;
     195              :     let mut monitor = match monitor {
     196              :         Ok(Ok(monitor)) => monitor,
     197              :         Ok(Err(error)) => {
     198            0 :             error!(?error, "failed to create monitor");
     199              :             return;
     200              :         }
     201              :         Err(_) => {
     202            0 :             error!(
     203            0 :                 ?timeout,
     204            0 :                 "creating monitor timed out (probably waiting to receive protocol range)"
     205            0 :             );
     206              :             return;
     207              :         }
     208              :     };
     209            0 :     info!("connected to agent");
     210              : 
     211              :     match monitor.run().await {
     212            0 :         Ok(()) => info!("monitor was killed due to new connection"),
     213            0 :         Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
     214              :     }
     215              : }
        

Generated by: LCOV version 2.1-beta