       1              : #![deny(unsafe_code)]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : #![cfg(target_os = "linux")]
       4              : 
       5              : use anyhow::Context;
       6              : use axum::{
       7              :     extract::{ws::WebSocket, State, WebSocketUpgrade},
       8              :     response::Response,
       9              : };
      10              : use axum::{routing::get, Router, Server};
      11              : use clap::Parser;
      12              : use futures::Future;
      13              : use std::{fmt::Debug, time::Duration};
      14              : use sysinfo::{RefreshKind, System, SystemExt};
      15              : use tokio::{sync::broadcast, task::JoinHandle};
      16              : use tokio_util::sync::CancellationToken;
      17              : use tracing::{error, info};
      18              : 
      19              : use runner::Runner;
      20              : 
      21              : // Code that interfaces with agent
      22              : pub mod dispatcher;
      23              : pub mod protocol;
      24              : 
      25              : pub mod cgroup;
      26              : pub mod filecache;
      27              : pub mod runner;
      28              : 
      29              : /// The vm-monitor is an autoscaling component started by compute_ctl.
      30              : ///
      31              : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
      32              : /// memory pressure by making requests to the autoscaler-agent.
      33            0 : #[derive(Debug, Parser)]
      34              : pub struct Args {
      35              :     /// The name of the cgroup we should monitor for memory.high events. This
      36              :     /// is the cgroup that postgres should be running in.
      37              :     #[arg(short, long)]
      38              :     pub cgroup: Option<String>,
      39              : 
      40              :     /// The connection string for the Postgres file cache we should manage.
      41              :     #[arg(short, long)]
      42              :     pub pgconnstr: Option<String>,
      43              : 
      44              :     /// The address we should listen on for connection requests. For the
      45              :     /// agent, this is For the informant, this is
      46              :     #[arg(short, long)]
      47            0 :     pub addr: String,
      48              : }
      49              : 
      50              : impl Args {
      51            0 :     pub fn addr(&self) -> &str {
      52            0 :         &self.addr
      53            0 :     }
      54              : }
      55              : 
      56              : /// The number of bytes in one mebibyte.
      57              : #[allow(non_upper_case_globals)]
      58              : const MiB: u64 = 1 << 20;
      59              : 
      60              : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
      61              : /// purposes. (Most calculations in this crate use bytes directly)
      62            0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
      63            0 :     (bytes as f32) / (MiB as f32)
      64            0 : }
      65              : 
      66            0 : pub fn get_total_system_memory() -> u64 {
      67            0 :     System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
      68            0 : }
      69              : 
      70              : /// Global app state for the Axum server
      71              : #[derive(Debug, Clone)]
      72              : pub struct ServerState {
      73              :     /// Used to close old connections.
      74              :     ///
      75              :     /// When a new connection is made, we send a message signalling to the old
      76              :     /// connection to close.
      77              :     pub sender: broadcast::Sender<()>,
      78              : 
      79              :     /// Used to cancel all spawned threads in the monitor.
      80              :     pub token: CancellationToken,
      81              : 
      82              :     // The CLI args
      83              :     pub args: &'static Args,
      84              : }
      85              : 
      86              : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
      87              : ///
      88              : /// This is mainly meant to be called with futures that will be pending for a very
      89              : /// long time, or are not mean to return. If it is not desirable for the future to
      90              : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
      91              : /// be logged with `f`.
      92            0 : pub fn spawn_with_cancel<T, F>(
      93            0 :     token: CancellationToken,
      94            0 :     f: F,
      95            0 :     future: T,
      96            0 : ) -> JoinHandle<Option<T::Output>>
      97            0 : where
      98            0 :     T: Future + Send + 'static,
      99            0 :     T::Output: Send + 'static,
     100            0 :     F: FnOnce(&T::Output) + Send + 'static,
     101            0 : {
     102            0 :     tokio::spawn(async move {
     103              :         tokio::select! {
     104              :             _ = token.cancelled() => {
     105              :                 info!("received global kill signal");
     106              :                 None
     107              :             }
     108              :             res = future => {
     109              :                 f(&res);
     110              :                 Some(res)
     111              :             }
     112              :         }
     113            0 :     })
     114            0 : }
     115              : 
     116              : /// The entrypoint to the binary.
     117              : ///
     118              : /// Set up tracing, parse arguments, and start an http server.
     119            0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
     120            0 :     // This channel is used to close old connections. When a new connection is
     121            0 :     // made, we send a message signalling to the old connection to close.
     122            0 :     let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
     123            0 : 
     124            0 :     let app = Router::new()
     125            0 :         // This route gets upgraded to a websocket connection. We only support
     126            0 :         // one connection at a time, which we enforce by killing old connections
     127            0 :         // when we receive a new one.
     128            0 :         .route("/monitor", get(ws_handler))
     129            0 :         .with_state(ServerState {
     130            0 :             sender,
     131            0 :             token,
     132            0 :             args,
     133            0 :         });
     134            0 : 
     135            0 :     let addr = args.addr();
     136            0 :     let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
     137            0 :         .with_context(|| format!("failed to bind to {addr}"))?;
     138              : 
     139            0 :     info!(addr, "server bound");
     140              : 
     141            0 :     bound
     142            0 :         .serve(app.into_make_service())
     143            0 :         .await
     144            0 :         .context("server exited")?;
     145              : 
     146            0 :     Ok(())
     147            0 : }
     148              : 
     149              : /// Handles incoming websocket connections.
     150              : ///
     151              : /// If we are already to connected to an agent, we kill that old connection
     152              : /// and accept the new one.
     153            0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
     154              : pub async fn ws_handler(
     155              :     ws: WebSocketUpgrade,
     156              :     State(ServerState {
     157              :         sender,
     158              :         token,
     159              :         args,
     160              :     }): State<ServerState>,
     161              : ) -> Response {
     162              :     // Kill the old monitor
     163              :     info!("closing old connection if there is one");
     164              :     let _ = sender.send(());
     165              : 
     166              :     // Start the new one. Wow, the cycle of death and rebirth
     167              :     let closer = sender.subscribe();
     168            0 :     ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
     169              : }
     170              : 
     171              : /// Starts the monitor. If startup fails or the monitor exits, an error will
     172              : /// be logged and our internal state will be reset to allow for new connections.
     173            0 : #[tracing::instrument(skip_all)]
     174              : async fn start_monitor(
     175              :     ws: WebSocket,
     176              :     args: &Args,
     177              :     kill: broadcast::Receiver<()>,
     178              :     token: CancellationToken,
     179              : ) {
     180              :     info!(
     181              :         ?args,
     182              :         "accepted new websocket connection -> starting monitor"
     183              :     );
     184              :     let timeout = Duration::from_secs(4);
     185              :     let monitor = tokio::time::timeout(
     186              :         timeout,
     187              :         Runner::new(Default::default(), args, ws, kill, token),
     188              :     )
     189              :     .await;
     190              :     let mut monitor = match monitor {
     191              :         Ok(Ok(monitor)) => monitor,
     192              :         Ok(Err(error)) => {
     193              :             error!(?error, "failed to create monitor");
     194              :             return;
     195              :         }
     196              :         Err(_) => {
     197              :             error!(
     198              :                 ?timeout,
     199              :                 "creating monitor timed out (probably waiting to receive protocol range)"
     200              :             );
     201              :             return;
     202              :         }
     203              :     };
     204              :     info!("connected to agent");
     205              : 
     206              :     match {
     207              :         Ok(()) => info!("monitor was killed due to new connection"),
     208              :         Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
     209              :     }
     210              : }

