LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - runner.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 0.0 % 29 0
Test Date: 2024-11-13 18:23:39 Functions: 0.0 % 11 0

            Line data    Source code
       1              : //! Exposes the `Runner`, which handles messages received from agent and
       2              : //! sends upscale requests.
       3              : //!
       4              : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
       5              : //! all functionality.
       6              : 
       7              : use std::fmt::Debug;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use anyhow::{bail, Context};
      11              : use axum::extract::ws::{Message, WebSocket};
      12              : use futures::StreamExt;
      13              : use tokio::sync::{broadcast, watch};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::{debug, error, info, warn};
      16              : 
      17              : use crate::cgroup::{self, CgroupWatcher};
      18              : use crate::dispatcher::Dispatcher;
      19              : use crate::filecache::{FileCacheConfig, FileCacheState};
      20              : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
      21              : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
      22              : 
      23              : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
      24              : /// signals from the agent.
      25              : #[derive(Debug)]
      26              : pub struct Runner {
      27              :     config: Config,
      28              :     filecache: Option<FileCacheState>,
      29              :     cgroup: Option<CgroupState>,
      30              :     dispatcher: Dispatcher,
      31              : 
      32              :     /// We "mint" new message ids by incrementing this counter and taking the value.
      33              :     ///
      34              :     /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
      35              :     /// by us vs the autoscaler-agent.
      36              :     counter: usize,
      37              : 
      38              :     last_upscale_request_at: Option<Instant>,
      39              : 
      40              :     /// A signal to kill the main thread produced by `self.run()`. This is triggered
      41              :     /// when the server receives a new connection. When the thread receives the
      42              :     /// signal off this channel, it will gracefully shutdown.
      43              :     kill: broadcast::Receiver<()>,
      44              : }
      45              : 
      46              : #[derive(Debug)]
      47              : struct CgroupState {
      48              :     watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
      49              :     /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
      50              :     /// requests.
      51              :     threshold: u64,
      52              : }
      53              : 
      54              : /// Configuration for a `Runner`
      55              : #[derive(Debug)]
      56              : pub struct Config {
      57              :     /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
      58              :     /// handing out the rest to userspace. This value is the estimated difference between the
      59              :     /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
      60              :     ///
      61              :     /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
      62              :     /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
      63              :     ///
      64              :     /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
      65              :     /// size, rather than the self-reported memory size, according to the kernel.
      66              :     ///
      67              :     /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
      68              :     /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
      69              :     /// should be removed once we have a better solution there.
      70              :     sys_buffer_bytes: u64,
      71              : 
      72              :     /// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
      73              :     /// other words, providing a ceiling for the highest value of the threshold by enforcing that
      74              :     /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
      75              :     /// threshold.
      76              :     ///
      77              :     /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
      78              :     /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
      79              :     /// memory.
      80              :     ///
      81              :     /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
      82              :     /// cgroup is using more than 85% of total memory.
      83              :     cgroup_min_overhead_fraction: f64,
      84              : 
      85              :     cgroup_downscale_threshold_buffer_bytes: u64,
      86              : }
      87              : 
      88              : impl Default for Config {
      89            0 :     fn default() -> Self {
      90            0 :         Self {
      91            0 :             sys_buffer_bytes: 100 * MiB,
      92            0 :             cgroup_min_overhead_fraction: 0.15,
      93            0 :             cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
      94            0 :         }
      95            0 :     }
      96              : }
      97              : 
      98              : impl Config {
      99            0 :     fn cgroup_threshold(&self, total_mem: u64) -> u64 {
     100            0 :         // We want our threshold to be met gracefully instead of letting postgres get OOM-killed
     101            0 :         // (or if there's room, spilling to swap).
     102            0 :         // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
     103            0 :         // remaining above the threshold.
     104            0 :         (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64
     105            0 :     }
     106              : }
     107              : 
     108              : impl Runner {
     109              :     /// Create a new monitor.
     110            0 :     #[tracing::instrument(skip_all, fields(?config, ?args))]
     111              :     pub async fn new(
     112              :         config: Config,
     113              :         args: &Args,
     114              :         ws: WebSocket,
     115              :         kill: broadcast::Receiver<()>,
     116              :         token: CancellationToken,
     117              :     ) -> anyhow::Result<Runner> {
     118              :         anyhow::ensure!(
     119              :             config.sys_buffer_bytes != 0,
     120              :             "invalid monitor Config: sys_buffer_bytes cannot be 0"
     121              :         );
     122              : 
     123              :         let dispatcher = Dispatcher::new(ws)
     124              :             .await
     125              :             .context("error creating new dispatcher")?;
     126              : 
     127              :         let mut state = Runner {
     128              :             config,
     129              :             filecache: None,
     130              :             cgroup: None,
     131              :             dispatcher,
     132              :             counter: 1, // NB: must be odd, see the comment about the field for more.
     133              :             last_upscale_request_at: None,
     134              :             kill,
     135              :         };
     136              : 
     137              :         let mem = get_total_system_memory();
     138              : 
     139              :         if let Some(connstr) = &args.pgconnstr {
     140              :             info!("initializing file cache");
     141              :             let config = FileCacheConfig::default();
     142              : 
     143              :             let mut file_cache = FileCacheState::new(connstr, config, token.clone())
     144              :                 .await
     145              :                 .context("failed to create file cache")?;
     146              : 
     147              :             let size = file_cache
     148              :                 .get_file_cache_size()
     149              :                 .await
     150              :                 .context("error getting file cache size")?;
     151              : 
     152              :             let new_size = file_cache.config.calculate_cache_size(mem);
     153              :             info!(
     154              :                 initial = bytes_to_mebibytes(size),
     155              :                 new = bytes_to_mebibytes(new_size),
     156              :                 "setting initial file cache size",
     157              :             );
     158              : 
     159              :             // note: even if size == new_size, we want to explicitly set it, just
     160              :             // to make sure that we have the permissions to do so
     161              :             let actual_size = file_cache
     162              :                 .set_file_cache_size(new_size)
     163              :                 .await
     164              :                 .context("failed to set file cache size, possibly due to inadequate permissions")?;
     165              :             if actual_size != new_size {
     166              :                 info!("file cache size actually got set to {actual_size}")
     167              :             }
     168              : 
     169              :             state.filecache = Some(file_cache);
     170              :         }
     171              : 
     172              :         if let Some(name) = &args.cgroup {
     173              :             // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
     174              :             // now, and then set limits later.
     175              :             info!("initializing cgroup");
     176              : 
     177              :             let cgroup =
     178              :                 CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
     179              : 
     180              :             let init_value = cgroup::MemoryHistory {
     181              :                 avg_non_reclaimable: 0,
     182              :                 samples_count: 0,
     183              :                 samples_span: Duration::ZERO,
     184              :             };
     185              :             let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
     186              : 
     187            0 :             spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
     188            0 :                 cgroup.watch(hist_tx).await
     189            0 :             });
     190              : 
     191              :             let threshold = state.config.cgroup_threshold(mem);
     192              :             info!(threshold, "set initial cgroup threshold",);
     193              : 
     194              :             state.cgroup = Some(CgroupState {
     195              :                 watcher: hist_rx,
     196              :                 threshold,
     197              :             });
     198              :         }
     199              : 
     200              :         Ok(state)
     201              :     }
     202              : 
     203              :     /// Attempt to downscale filecache + cgroup
     204            0 :     #[tracing::instrument(skip_all, fields(?target))]
     205              :     pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
     206              :         // Nothing to adjust
     207              :         if self.cgroup.is_none() && self.filecache.is_none() {
     208              :             info!("no action needed for downscale (no cgroup or file cache enabled)");
     209              :             return Ok((
     210              :                 true,
     211              :                 "monitor is not managing cgroup or file cache".to_string(),
     212              :             ));
     213              :         }
     214              : 
     215              :         let requested_mem = target.mem;
     216              :         let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
     217              :         let expected_file_cache_size = self
     218              :             .filecache
     219              :             .as_ref()
     220            0 :             .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
     221              :             .unwrap_or(0);
     222              :         if let Some(cgroup) = &self.cgroup {
     223              :             let (last_time, last_history) = *cgroup.watcher.borrow();
     224              : 
     225              :             // NB: The ordering of these conditions is intentional. During startup, we should deny
     226              :             // downscaling until we have enough information to determine that it's safe to do so
     227              :             // (i.e. enough samples have come in). But if it's been a while and we *still* haven't
     228              :             // received any information, we should *fail* instead of just denying downscaling.
     229              :             //
     230              :             // `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
     231              :             // serves double-duty: it trips if we haven't received *any* metrics for long enough,
     232              :             // OR if we haven't received metrics *recently enough*.
     233              :             //
     234              :             // TODO: make the duration here configurable.
     235              :             if last_time.elapsed() > Duration::from_secs(5) {
     236              :                 bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
     237              :             } else if last_history.samples_count <= 1 {
     238              :                 let status = "haven't received enough cgroup memory stats yet";
     239              :                 info!(status, "discontinuing downscale");
     240              :                 return Ok((false, status.to_owned()));
     241              :             }
     242              : 
     243              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     244              : 
     245              :             let current = last_history.avg_non_reclaimable;
     246              : 
     247              :             if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
     248              :                 let status = format!(
     249              :                     "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
     250              :                     "calculated memory threshold too low",
     251              :                     bytes_to_mebibytes(new_threshold),
     252              :                     bytes_to_mebibytes(current),
     253              :                     bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
     254              :                 );
     255              : 
     256              :                 info!(status, "discontinuing downscale");
     257              : 
     258              :                 return Ok((false, status));
     259              :             }
     260              :         }
     261              : 
     262              :         // The downscaling has been approved. Downscale the file cache, then the cgroup.
     263              :         let mut status = vec![];
     264              :         if let Some(file_cache) = &mut self.filecache {
     265              :             let actual_usage = file_cache
     266              :                 .set_file_cache_size(expected_file_cache_size)
     267              :                 .await
     268              :                 .context("failed to set file cache size")?;
     269              :             let message = format!(
     270              :                 "set file cache size to {} MiB",
     271              :                 bytes_to_mebibytes(actual_usage),
     272              :             );
     273              :             info!("downscale: {message}");
     274              :             status.push(message);
     275              :         }
     276              : 
     277              :         if let Some(cgroup) = &mut self.cgroup {
     278              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     279              : 
     280              :             let message = format!(
     281              :                 "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
     282              :                 bytes_to_mebibytes(cgroup.threshold),
     283              :                 bytes_to_mebibytes(new_threshold),
     284              :                 bytes_to_mebibytes(usable_system_memory)
     285              :             );
     286              :             cgroup.threshold = new_threshold;
     287              :             info!("downscale: {message}");
     288              :             status.push(message);
     289              :         }
     290              : 
     291              :         // TODO: make this status thing less jank
     292              :         let status = status.join("; ");
     293              :         Ok((true, status))
     294              :     }
     295              : 
     296              :     /// Handle new resources
     297            0 :     #[tracing::instrument(skip_all, fields(?resources))]
     298              :     pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
     299              :         if self.filecache.is_none() && self.cgroup.is_none() {
     300              :             info!("no action needed for upscale (no cgroup or file cache enabled)");
     301              :             return Ok(());
     302              :         }
     303              : 
     304              :         let new_mem = resources.mem;
     305              :         let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
     306              : 
     307              :         if let Some(file_cache) = &mut self.filecache {
     308              :             let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
     309              :             info!(
     310              :                 target = bytes_to_mebibytes(expected_usage),
     311              :                 total = bytes_to_mebibytes(new_mem),
     312              :                 "updating file cache size",
     313              :             );
     314              : 
     315              :             let actual_usage = file_cache
     316              :                 .set_file_cache_size(expected_usage)
     317              :                 .await
     318              :                 .context("failed to set file cache size")?;
     319              : 
     320              :             if actual_usage != expected_usage {
     321              :                 warn!(
     322              :                     "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
     323              :                     bytes_to_mebibytes(expected_usage),
     324              :                     bytes_to_mebibytes(actual_usage)
     325              :                 )
     326              :             }
     327              :         }
     328              : 
     329              :         if let Some(cgroup) = &mut self.cgroup {
     330              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     331              : 
     332              :             info!(
     333              :                 "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
     334              :                 bytes_to_mebibytes(cgroup.threshold),
     335              :                 bytes_to_mebibytes(new_threshold),
     336              :                 bytes_to_mebibytes(usable_system_memory)
     337              :             );
     338              :             cgroup.threshold = new_threshold;
     339              :         }
     340              : 
     341              :         Ok(())
     342              :     }
     343              : 
     344              :     /// Take in a message and perform some action, such as downscaling or upscaling,
     345              :     /// and return a message to be send back.
     346            0 :     #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
     347              :     pub async fn process_message(
     348              :         &mut self,
     349              :         InboundMsg { inner, id }: InboundMsg,
     350              :     ) -> anyhow::Result<Option<OutboundMsg>> {
     351              :         match inner {
     352              :             InboundMsgKind::UpscaleNotification { granted } => {
     353              :                 self.handle_upscale(granted)
     354              :                     .await
     355              :                     .context("failed to handle upscale")?;
     356              :                 Ok(Some(OutboundMsg::new(
     357              :                     OutboundMsgKind::UpscaleConfirmation {},
     358              :                     id,
     359              :                 )))
     360              :             }
     361              :             InboundMsgKind::DownscaleRequest { target } => self
     362              :                 .try_downscale(target)
     363              :                 .await
     364              :                 .context("failed to downscale")
     365            0 :                 .map(|(ok, status)| {
     366            0 :                     Some(OutboundMsg::new(
     367            0 :                         OutboundMsgKind::DownscaleResult { ok, status },
     368            0 :                         id,
     369            0 :                     ))
     370            0 :                 }),
     371              :             InboundMsgKind::InvalidMessage { error } => {
     372              :                 warn!(
     373              :                     %error, id, "received notification of an invalid message we sent"
     374              :                 );
     375              :                 Ok(None)
     376              :             }
     377              :             InboundMsgKind::InternalError { error } => {
     378              :                 warn!(error, id, "agent experienced an internal error");
     379              :                 Ok(None)
     380              :             }
     381              :             InboundMsgKind::HealthCheck {} => {
     382              :                 Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
     383              :             }
     384              :         }
     385              :     }
     386              : 
     387              :     // TODO: don't propagate errors, probably just warn!?
     388            0 :     #[tracing::instrument(skip_all)]
     389              :     pub async fn run(&mut self) -> anyhow::Result<()> {
     390              :         info!("starting dispatcher");
     391              :         loop {
     392              :             tokio::select! {
     393              :                 signal = self.kill.recv() => {
     394              :                     match signal {
     395              :                         Ok(()) => return Ok(()),
     396              :                         Err(e) => bail!("failed to receive kill signal: {e}")
     397              :                     }
     398              :                 }
     399              : 
     400              :                 // New memory stats from the cgroup, *may* need to request upscaling, if we've
     401              :                 // exceeded the threshold
     402              :                 result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
     403              :                     result.context("failed to receive from cgroup memory stats watcher")?;
     404              : 
     405              :                     let cgroup = self.cgroup.as_ref().unwrap();
     406              : 
     407              :                     let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
     408              : 
     409              :                     // If we haven't exceeded the threshold, then we're all ok
     410              :                     if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
     411              :                         continue;
     412              :                     }
     413              : 
     414              :                     // Otherwise, we generally want upscaling. But, if it's been less than 1 second
     415              :                     // since the last time we requested upscaling, ignore the event, to avoid
     416              :                     // spamming the agent.
     417              :                     if let Some(t) = self.last_upscale_request_at {
     418              :                         let elapsed = t.elapsed();
     419              :                         if elapsed < Duration::from_secs(1) {
     420              :                             // *Ideally* we'd like to log here that we're ignoring the fact the
     421              :                             // memory stats are too high, but in practice this can result in
     422              :                             // spamming the logs with repetitive messages about ignoring the signal
     423              :                             //
     424              :                             // See https://github.com/neondatabase/neon/issues/5865 for more.
     425              :                             continue;
     426              :                         }
     427              :                     }
     428              : 
     429              :                     self.last_upscale_request_at = Some(Instant::now());
     430              : 
     431              :                     info!(
     432              :                         avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
     433              :                         threshold = bytes_to_mebibytes(cgroup.threshold),
     434              :                         "cgroup memory stats are high enough to upscale, requesting upscale",
     435              :                     );
     436              : 
     437              :                     self.counter += 2; // Increment, preserving parity (i.e. keep the
     438              :                                        // counter odd). See the field comment for more.
     439              :                     self.dispatcher
     440              :                         .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
     441              :                         .await
     442              :                         .context("failed to send message")?;
     443              :                 },
     444              : 
     445              :                 // there is a message from the agent
     446              :                 msg = self.dispatcher.source.next() => {
     447              :                     if let Some(msg) = msg {
     448              :                         match &msg {
     449              :                             Ok(msg) => {
     450              :                                 let message: InboundMsg = match msg {
     451              :                                     Message::Text(text) => {
     452              :                                         serde_json::from_str(text).context("failed to deserialize text message")?
     453              :                                     }
     454              :                                     other => {
     455              :                                         warn!(
     456              :                                             // Don't use 'message' as a key as the
     457              :                                             // string also uses that for its key
     458              :                                             msg = ?other,
     459              :                                             "problem processing incoming message: agent should only send text messages but received different type"
     460              :                                         );
     461              :                                         continue
     462              :                                     },
     463              :                                 };
     464              : 
     465              :                                 if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
     466              :                                     debug!(?msg, "received message");
     467              :                                 } else {
     468              :                                     info!(?msg, "received message");
     469              :                                 }
     470              : 
     471              :                                 let out = match self.process_message(message.clone()).await {
     472              :                                     Ok(Some(out)) => out,
     473              :                                     Ok(None) => continue,
     474              :                                     Err(e) => {
     475              :                                         // use {:#} for our logging because the display impl only
     476              :                                         // gives the outermost cause, and the debug impl
     477              :                                         // pretty-prints the error, whereas {:#} contains all the
     478              :                                         // causes, but is compact (no newlines).
     479              :                                         warn!(error = format!("{e:#}"), "error handling message");
     480              :                                         OutboundMsg::new(
     481              :                                             OutboundMsgKind::InternalError {
     482              :                                                 error: e.to_string(),
     483              :                                             },
     484              :                                             message.id
     485              :                                         )
     486              :                                     }
     487              :                                 };
     488              : 
     489              :                                 self.dispatcher
     490              :                                     .send(out)
     491              :                                     .await
     492              :                                     .context("failed to send message")?;
     493              :                             }
     494              :                             Err(e) => warn!(
     495              :                                 error = format!("{e}"),
     496              :                                 msg = ?msg,
     497              :                                 "received error message"
     498              :                             ),
     499              :                         }
     500              :                     } else {
     501              :                         anyhow::bail!("dispatcher connection closed")
     502              :                     }
     503              :                 }
     504              :             }
     505              :         }
     506              :     }
     507              : }
        

Generated by: LCOV version 2.1-beta