LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - runner.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 69 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 43 0

            Line data    Source code
       1              : //! Exposes the `Runner`, which handles messages received from agent and
       2              : //! sends upscale requests.
       3              : //!
       4              : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
       5              : //! all functionality.
       6              : 
       7              : use std::sync::Arc;
       8              : use std::{fmt::Debug, mem};
       9              : 
      10              : use anyhow::{bail, Context};
      11              : use axum::extract::ws::{Message, WebSocket};
      12              : use futures::StreamExt;
      13              : use tokio::sync::broadcast;
      14              : use tokio::sync::mpsc;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::{error, info, warn};
      17              : 
      18              : use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced};
      19              : use crate::dispatcher::Dispatcher;
      20              : use crate::filecache::{FileCacheConfig, FileCacheState};
      21              : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
      22              : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
      23              : 
      24              : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
      25              : /// signals from the agent.
      26            0 : #[derive(Debug)]
      27              : pub struct Runner {
      28              :     config: Config,
      29              :     filecache: Option<FileCacheState>,
      30              :     cgroup: Option<Arc<CgroupWatcher>>,
      31              :     dispatcher: Dispatcher,
      32              : 
      33              :     /// We "mint" new message ids by incrementing this counter and taking the value.
      34              :     ///
      35              :     /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
      36              :     /// by us vs the autoscaler-agent.
      37              :     counter: usize,
      38              : 
      39              :     /// A signal to kill the main thread produced by `self.run()`. This is triggered
      40              :     /// when the server receives a new connection. When the thread receives the
      41              :     /// signal off this channel, it will gracefully shutdown.
      42              :     kill: broadcast::Receiver<()>,
      43              : }
      44              : 
      45              : /// Configuration for a `Runner`
      46            0 : #[derive(Debug)]
      47              : pub struct Config {
      48              :     /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
      49              :     /// handing out the rest to userspace. This value is the estimated difference between the
      50              :     /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
      51              :     ///
      52              :     /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
      53              :     /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
      54              :     ///
      55              :     /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
      56              :     /// size, rather than the self-reported memory size, according to the kernel.
      57              :     ///
      58              :     /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
      59              :     /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
      60              :     /// should be removed once we have a better solution there.
      61              :     sys_buffer_bytes: u64,
      62              : }
      63              : 
      64              : impl Default for Config {
      65            0 :     fn default() -> Self {
      66            0 :         Self {
      67            0 :             sys_buffer_bytes: 100 * MiB,
      68            0 :         }
      69            0 :     }
      70              : }
      71              : 
      72              : impl Runner {
      73              :     /// Create a new monitor.
      74            0 :     #[tracing::instrument(skip_all, fields(?config, ?args))]
      75              :     pub async fn new(
      76              :         config: Config,
      77              :         args: &Args,
      78              :         ws: WebSocket,
      79              :         kill: broadcast::Receiver<()>,
      80              :         token: CancellationToken,
      81              :     ) -> anyhow::Result<Runner> {
      82              :         anyhow::ensure!(
      83              :             config.sys_buffer_bytes != 0,
      84              :             "invalid monitor Config: sys_buffer_bytes cannot be 0"
      85              :         );
      86              : 
      87              :         // *NOTE*: the dispatcher and cgroup manager talk through these channels
      88              :         // so make sure they each get the correct half, nothing is droppped, etc.
      89              :         let (notified_send, notified_recv) = mpsc::channel(1);
      90              :         let (requesting_send, requesting_recv) = mpsc::channel(1);
      91              : 
      92              :         let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv)
      93              :             .await
      94              :             .context("error creating new dispatcher")?;
      95              : 
      96              :         let mut state = Runner {
      97              :             config,
      98              :             filecache: None,
      99              :             cgroup: None,
     100              :             dispatcher,
     101              :             counter: 1, // NB: must be odd, see the comment about the field for more.
     102              :             kill,
     103              :         };
     104              : 
     105              :         let mut file_cache_reserved_bytes = 0;
     106              :         let mem = get_total_system_memory();
     107              : 
     108              :         // We need to process file cache initialization before cgroup initialization, so that the memory
     109              :         // allocated to the file cache is appropriately taken into account when we decide the cgroup's
     110              :         // memory limits.
     111              :         if let Some(connstr) = &args.pgconnstr {
     112            0 :             info!("initializing file cache");
     113              :             let config = match args.file_cache_on_disk {
     114              :                 true => FileCacheConfig::default_on_disk(),
     115              :                 false => FileCacheConfig::default_in_memory(),
     116              :             };
     117              : 
     118              :             let mut file_cache = FileCacheState::new(connstr, config, token.clone())
     119              :                 .await
     120              :                 .context("failed to create file cache")?;
     121              : 
     122              :             let size = file_cache
     123              :                 .get_file_cache_size()
     124              :                 .await
     125              :                 .context("error getting file cache size")?;
     126              : 
     127              :             let new_size = file_cache.config.calculate_cache_size(mem);
     128            0 :             info!(
     129            0 :                 initial = bytes_to_mebibytes(size),
     130            0 :                 new = bytes_to_mebibytes(new_size),
     131            0 :                 "setting initial file cache size",
     132            0 :             );
     133              : 
     134              :             // note: even if size == new_size, we want to explicitly set it, just
     135              :             // to make sure that we have the permissions to do so
     136              :             let actual_size = file_cache
     137              :                 .set_file_cache_size(new_size)
     138              :                 .await
     139              :                 .context("failed to set file cache size, possibly due to inadequate permissions")?;
     140              :             if actual_size != new_size {
     141            0 :                 info!("file cache size actually got set to {actual_size}")
     142              :             }
     143              :             // Mark the resources given to the file cache as reserved, but only if it's in memory.
     144              :             if !args.file_cache_on_disk {
     145              :                 file_cache_reserved_bytes = actual_size;
     146              :             }
     147              : 
     148              :             state.filecache = Some(file_cache);
     149              :         }
     150              : 
     151              :         if let Some(name) = &args.cgroup {
     152              :             let (mut cgroup, cgroup_event_stream) =
     153              :                 CgroupWatcher::new(name.clone(), requesting_send)
     154              :                     .context("failed to create cgroup manager")?;
     155              : 
     156              :             let available = mem - file_cache_reserved_bytes;
     157              : 
     158              :             cgroup
     159              :                 .set_memory_limits(available)
     160              :                 .context("failed to set cgroup memory limits")?;
     161              : 
     162              :             let cgroup = Arc::new(cgroup);
     163              : 
     164              :             // Some might call this . . . cgroup v2
     165              :             let cgroup_clone = Arc::clone(&cgroup);
     166              : 
     167            0 :             spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
     168            0 :                 cgroup_clone.watch(notified_recv, cgroup_event_stream).await
     169            0 :             });
     170              : 
     171              :             state.cgroup = Some(cgroup);
     172              :         } else {
     173              :             // *NOTE*: We need to forget the sender so that its drop impl does not get ran.
     174              :             // This allows us to poll it in `Monitor::run` regardless of whether we
     175              :             // are managing a cgroup or not. If we don't forget it, all receives will
     176              :             // immediately return an error because the sender is droped and it will
     177              :             // claim all select! statements, effectively turning `Monitor::run` into
     178              :             // `loop { fail to receive }`.
     179              :             mem::forget(requesting_send);
     180              :         }
     181              : 
     182              :         Ok(state)
     183              :     }
     184              : 
     185              :     /// Attempt to downscale filecache + cgroup
     186            0 :     #[tracing::instrument(skip_all, fields(?target))]
     187              :     pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
     188              :         // Nothing to adjust
     189              :         if self.cgroup.is_none() && self.filecache.is_none() {
     190            0 :             info!("no action needed for downscale (no cgroup or file cache enabled)");
     191              :             return Ok((
     192              :                 true,
     193              :                 "monitor is not managing cgroup or file cache".to_string(),
     194              :             ));
     195              :         }
     196              : 
     197              :         let requested_mem = target.mem;
     198              :         let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
     199              :         let expected_file_cache_mem_usage = self
     200              :             .filecache
     201              :             .as_ref()
     202            0 :             .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
     203              :             .unwrap_or(0);
     204              :         let mut new_cgroup_mem_high = 0;
     205              :         if let Some(cgroup) = &self.cgroup {
     206              :             new_cgroup_mem_high = cgroup
     207              :                 .config
     208              :                 .calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage);
     209              : 
     210              :             let current = cgroup
     211              :                 .current_memory_usage()
     212              :                 .context("failed to fetch cgroup memory")?;
     213              : 
     214              :             if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes {
     215              :                 let status = format!(
     216              :                     "{}: {} MiB (new high) < {} (current usage) + {} (buffer)",
     217              :                     "calculated memory.high too low",
     218              :                     bytes_to_mebibytes(new_cgroup_mem_high),
     219              :                     bytes_to_mebibytes(current),
     220              :                     bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes)
     221              :                 );
     222              : 
     223            0 :                 info!(status, "discontinuing downscale");
     224              : 
     225              :                 return Ok((false, status));
     226              :             }
     227              :         }
     228              : 
     229              :         // The downscaling has been approved. Downscale the file cache, then the cgroup.
     230              :         let mut status = vec![];
     231              :         let mut file_cache_mem_usage = 0;
     232              :         if let Some(file_cache) = &mut self.filecache {
     233              :             let actual_usage = file_cache
     234              :                 .set_file_cache_size(expected_file_cache_mem_usage)
     235              :                 .await
     236              :                 .context("failed to set file cache size")?;
     237              :             if file_cache.config.in_memory {
     238              :                 file_cache_mem_usage = actual_usage;
     239              :             }
     240              :             let message = format!(
     241              :                 "set file cache size to {} MiB (in memory = {})",
     242              :                 bytes_to_mebibytes(actual_usage),
     243              :                 file_cache.config.in_memory,
     244              :             );
     245            0 :             info!("downscale: {message}");
     246              :             status.push(message);
     247              :         }
     248              : 
     249              :         if let Some(cgroup) = &self.cgroup {
     250              :             let available_memory = usable_system_memory - file_cache_mem_usage;
     251              : 
     252              :             if file_cache_mem_usage != expected_file_cache_mem_usage {
     253              :                 new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
     254              :             }
     255              : 
     256              :             let limits = MemoryLimits::new(
     257              :                 // new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
     258              :                 // since it is properly initialized in the previous cgroup if let block
     259              :                 new_cgroup_mem_high,
     260              :                 available_memory,
     261              :             );
     262              :             cgroup
     263              :                 .set_limits(&limits)
     264              :                 .context("failed to set cgroup memory limits")?;
     265              : 
     266              :             let message = format!(
     267              :                 "set cgroup memory.high to {} MiB, of new max {} MiB",
     268              :                 bytes_to_mebibytes(new_cgroup_mem_high),
     269              :                 bytes_to_mebibytes(available_memory)
     270              :             );
     271            0 :             info!("downscale: {message}");
     272              :             status.push(message);
     273              :         }
     274              : 
     275              :         // TODO: make this status thing less jank
     276              :         let status = status.join("; ");
     277              :         Ok((true, status))
     278              :     }
     279              : 
     280              :     /// Handle new resources
     281            0 :     #[tracing::instrument(skip_all, fields(?resources))]
     282              :     pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
     283              :         if self.filecache.is_none() && self.cgroup.is_none() {
     284            0 :             info!("no action needed for upscale (no cgroup or file cache enabled)");
     285              :             return Ok(());
     286              :         }
     287              : 
     288              :         let new_mem = resources.mem;
     289              :         let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
     290              : 
     291              :         // Get the file cache's expected contribution to the memory usage
     292              :         let mut file_cache_mem_usage = 0;
     293              :         if let Some(file_cache) = &mut self.filecache {
     294              :             let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
     295            0 :             info!(
     296            0 :                 target = bytes_to_mebibytes(expected_usage),
     297            0 :                 total = bytes_to_mebibytes(new_mem),
     298            0 :                 "updating file cache size",
     299            0 :             );
     300              : 
     301              :             let actual_usage = file_cache
     302              :                 .set_file_cache_size(expected_usage)
     303              :                 .await
     304              :                 .context("failed to set file cache size")?;
     305              :             if file_cache.config.in_memory {
     306              :                 file_cache_mem_usage = actual_usage;
     307              :             }
     308              : 
     309              :             if actual_usage != expected_usage {
     310            0 :                 warn!(
     311            0 :                     "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
     312            0 :                     bytes_to_mebibytes(expected_usage),
     313            0 :                     bytes_to_mebibytes(actual_usage)
     314            0 :                 )
     315              :             }
     316              :         }
     317              : 
     318              :         if let Some(cgroup) = &self.cgroup {
     319              :             let available_memory = usable_system_memory - file_cache_mem_usage;
     320              :             let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
     321            0 :             info!(
     322            0 :                 target = bytes_to_mebibytes(new_cgroup_mem_high),
     323            0 :                 total = bytes_to_mebibytes(new_mem),
     324            0 :                 name = cgroup.path(),
     325            0 :                 "updating cgroup memory.high",
     326            0 :             );
     327              :             let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory);
     328              :             cgroup
     329              :                 .set_limits(&limits)
     330              :                 .context("failed to set file cache size")?;
     331              :         }
     332              : 
     333              :         Ok(())
     334              :     }
     335              : 
     336              :     /// Take in a message and perform some action, such as downscaling or upscaling,
     337              :     /// and return a message to be send back.
     338            0 :     #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
     339              :     pub async fn process_message(
     340              :         &mut self,
     341              :         InboundMsg { inner, id }: InboundMsg,
     342              :     ) -> anyhow::Result<Option<OutboundMsg>> {
     343              :         match inner {
     344              :             InboundMsgKind::UpscaleNotification { granted } => {
     345              :                 self.handle_upscale(granted)
     346              :                     .await
     347              :                     .context("failed to handle upscale")?;
     348              :                 self.dispatcher
     349              :                     .notify_upscale(Sequenced::new(granted))
     350              :                     .await
     351              :                     .context("failed to notify notify cgroup of upscale")?;
     352              :                 Ok(Some(OutboundMsg::new(
     353              :                     OutboundMsgKind::UpscaleConfirmation {},
     354              :                     id,
     355              :                 )))
     356              :             }
     357              :             InboundMsgKind::DownscaleRequest { target } => self
     358              :                 .try_downscale(target)
     359              :                 .await
     360              :                 .context("failed to downscale")
     361            0 :                 .map(|(ok, status)| {
     362            0 :                     Some(OutboundMsg::new(
     363            0 :                         OutboundMsgKind::DownscaleResult { ok, status },
     364            0 :                         id,
     365            0 :                     ))
     366            0 :                 }),
     367              :             InboundMsgKind::InvalidMessage { error } => {
     368            0 :                 warn!(
     369            0 :                     %error, id, "received notification of an invalid message we sent"
     370            0 :                 );
     371              :                 Ok(None)
     372              :             }
     373              :             InboundMsgKind::InternalError { error } => {
     374            0 :                 warn!(error, id, "agent experienced an internal error");
     375              :                 Ok(None)
     376              :             }
     377              :             InboundMsgKind::HealthCheck {} => {
     378              :                 Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
     379              :             }
     380              :         }
     381              :     }
     382              : 
     383              :     // TODO: don't propagate errors, probably just warn!?
     384            0 :     #[tracing::instrument(skip_all)]
     385              :     pub async fn run(&mut self) -> anyhow::Result<()> {
     386            0 :         info!("starting dispatcher");
     387              :         loop {
     388            0 :             tokio::select! {
     389            0 :                 signal = self.kill.recv() => {
     390              :                     match signal {
     391              :                         Ok(()) => return Ok(()),
     392              :                         Err(e) => bail!("failed to receive kill signal: {e}")
     393              :                     }
     394              :                 }
     395              :                 // we need to propagate an upscale request
     396            0 :                 request = self.dispatcher.request_upscale_events.recv() => {
     397              :                     if request.is_none() {
     398              :                         bail!("failed to listen for upscale event from cgroup")
     399              :                     }
     400            0 :                     info!("cgroup asking for upscale; forwarding request");
     401              :                     self.counter += 2; // Increment, preserving parity (i.e. keep the
     402              :                                        // counter odd). See the field comment for more.
     403              :                     self.dispatcher
     404              :                         .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
     405              :                         .await
     406              :                         .context("failed to send message")?;
     407              :                 }
     408              :                 // there is a message from the agent
     409            0 :                 msg = self.dispatcher.source.next() => {
     410              :                     if let Some(msg) = msg {
     411              :                         // Don't use 'message' as a key as the string also uses
     412              :                         // that for its key
     413            0 :                         info!(?msg, "received message");
     414              :                         match msg {
     415              :                             Ok(msg) => {
     416              :                                 let message: InboundMsg = match msg {
     417              :                                     Message::Text(text) => {
     418              :                                         serde_json::from_str(&text).context("failed to deserialize text message")?
     419              :                                     }
     420              :                                     other => {
     421            0 :                                         warn!(
     422            0 :                                             // Don't use 'message' as a key as the
     423            0 :                                             // string also uses that for its key
     424            0 :                                             msg = ?other,
     425            0 :                                             "agent should only send text messages but received different type"
     426            0 :                                         );
     427              :                                         continue
     428              :                                     },
     429              :                                 };
     430              : 
     431              :                                 let out = match self.process_message(message.clone()).await {
     432              :                                     Ok(Some(out)) => out,
     433              :                                     Ok(None) => continue,
     434              :                                     Err(e) => {
     435              :                                         let error = e.to_string();
     436            0 :                                         warn!(?error, "error handling message");
     437              :                                         OutboundMsg::new(
     438              :                                             OutboundMsgKind::InternalError {
     439              :                                                 error
     440              :                                             },
     441              :                                             message.id
     442              :                                         )
     443              :                                     }
     444              :                                 };
     445              : 
     446              :                                 self.dispatcher
     447              :                                     .send(out)
     448              :                                     .await
     449              :                                     .context("failed to send message")?;
     450              :                             }
     451            0 :                             Err(e) => warn!("{e}"),
     452              :                         }
     453              :                     } else {
     454              :                         anyhow::bail!("dispatcher connection closed")
     455              :                     }
     456              :                 }
     457              :             }
     458              :         }
     459              :     }
     460              : }
        

Generated by: LCOV version 2.1-beta