LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - runner.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 24 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 6 0

            Line data    Source code
       1              : //! Exposes the `Runner`, which handles messages received from agent and
       2              : //! sends upscale requests.
       3              : //!
       4              : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
       5              : //! all functionality.
       6              : 
       7              : use std::fmt::Debug;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use anyhow::{Context, bail};
      11              : use axum::extract::ws::{Message, WebSocket};
      12              : use futures::StreamExt;
      13              : use tokio::sync::{broadcast, watch};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::{debug, error, info, warn};
      16              : 
      17              : use crate::cgroup::{self, CgroupWatcher};
      18              : use crate::dispatcher::Dispatcher;
      19              : use crate::filecache::{FileCacheConfig, FileCacheState};
      20              : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
      21              : use crate::{Args, MiB, bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel};
      22              : 
      23              : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
      24              : /// signals from the agent.
      25              : #[derive(Debug)]
      26              : pub struct Runner {
      27              :     config: Config,
      28              :     filecache: Option<FileCacheState>,
      29              :     cgroup: Option<CgroupState>,
      30              :     dispatcher: Dispatcher,
      31              : 
      32              :     /// We "mint" new message ids by incrementing this counter and taking the value.
      33              :     ///
      34              :     /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
      35              :     /// by us vs the autoscaler-agent.
      36              :     counter: usize,
      37              : 
      38              :     last_upscale_request_at: Option<Instant>,
      39              : 
      40              :     /// A signal to kill the main thread produced by `self.run()`. This is triggered
      41              :     /// when the server receives a new connection. When the thread receives the
      42              :     /// signal off this channel, it will gracefully shutdown.
      43              :     kill: broadcast::Receiver<()>,
      44              : }
      45              : 
      46              : #[derive(Debug)]
      47              : struct CgroupState {
      48              :     watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
      49              :     /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
      50              :     /// requests.
      51              :     threshold: u64,
      52              : }
      53              : 
      54              : /// Configuration for a `Runner`
      55              : #[derive(Debug)]
      56              : pub struct Config {
      57              :     /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
      58              :     /// handing out the rest to userspace. This value is the estimated difference between the
      59              :     /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
      60              :     ///
      61              :     /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
      62              :     /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
      63              :     ///
      64              :     /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
      65              :     /// size, rather than the self-reported memory size, according to the kernel.
      66              :     ///
      67              :     /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
      68              :     /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
      69              :     /// should be removed once we have a better solution there.
      70              :     sys_buffer_bytes: u64,
      71              : 
      72              :     /// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
      73              :     /// other words, providing a ceiling for the highest value of the threshold by enforcing that
      74              :     /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
      75              :     /// threshold.
      76              :     ///
      77              :     /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
      78              :     /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
      79              :     /// memory.
      80              :     ///
      81              :     /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
      82              :     /// cgroup is using more than 85% of total memory.
      83              :     cgroup_min_overhead_fraction: f64,
      84              : 
      85              :     cgroup_downscale_threshold_buffer_bytes: u64,
      86              : }
      87              : 
      88              : impl Default for Config {
      89            0 :     fn default() -> Self {
      90            0 :         Self {
      91            0 :             sys_buffer_bytes: 100 * MiB,
      92            0 :             cgroup_min_overhead_fraction: 0.15,
      93            0 :             cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
      94            0 :         }
      95            0 :     }
      96              : }
      97              : 
      98              : impl Config {
      99            0 :     fn cgroup_threshold(&self, total_mem: u64) -> u64 {
     100            0 :         // We want our threshold to be met gracefully instead of letting postgres get OOM-killed
     101            0 :         // (or if there's room, spilling to swap).
     102            0 :         // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
     103            0 :         // remaining above the threshold.
     104            0 :         (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64
     105            0 :     }
     106              : }
     107              : 
     108              : impl Runner {
     109              :     /// Create a new monitor.
     110              :     #[tracing::instrument(skip_all, fields(?config, ?args))]
     111              :     pub async fn new(
     112              :         config: Config,
     113              :         args: &Args,
     114              :         ws: WebSocket,
     115              :         kill: broadcast::Receiver<()>,
     116              :         token: CancellationToken,
     117              :     ) -> anyhow::Result<Runner> {
     118              :         anyhow::ensure!(
     119              :             config.sys_buffer_bytes != 0,
     120              :             "invalid monitor Config: sys_buffer_bytes cannot be 0"
     121              :         );
     122              : 
     123              :         let dispatcher = Dispatcher::new(ws)
     124              :             .await
     125              :             .context("error creating new dispatcher")?;
     126              : 
     127              :         let mut state = Runner {
     128              :             config,
     129              :             filecache: None,
     130              :             cgroup: None,
     131              :             dispatcher,
     132              :             counter: 1, // NB: must be odd, see the comment about the field for more.
     133              :             last_upscale_request_at: None,
     134              :             kill,
     135              :         };
     136              : 
     137              :         let mem = get_total_system_memory();
     138              : 
     139              :         if let Some(connstr) = &args.pgconnstr {
     140              :             info!("initializing file cache");
     141              :             let config = FileCacheConfig::default();
     142              : 
     143              :             let mut file_cache = FileCacheState::new(connstr, config, token.clone())
     144              :                 .await
     145              :                 .context("failed to create file cache")?;
     146              : 
     147              :             let size = file_cache
     148              :                 .get_file_cache_size()
     149              :                 .await
     150              :                 .context("error getting file cache size")?;
     151              : 
     152              :             let new_size = file_cache.config.calculate_cache_size(mem);
     153              :             info!(
     154              :                 initial = bytes_to_mebibytes(size),
     155              :                 new = bytes_to_mebibytes(new_size),
     156              :                 "setting initial file cache size",
     157              :             );
     158              : 
     159              :             // note: even if size == new_size, we want to explicitly set it, just
     160              :             // to make sure that we have the permissions to do so
     161              :             let actual_size = file_cache
     162              :                 .set_file_cache_size(new_size)
     163              :                 .await
     164              :                 .context("failed to set file cache size, possibly due to inadequate permissions")?;
     165              :             if actual_size != new_size {
     166              :                 info!("file cache size actually got set to {actual_size}")
     167              :             }
     168              : 
     169              :             state.filecache = Some(file_cache);
     170              :         }
     171              : 
     172              :         if let Some(name) = &args.cgroup {
     173              :             // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
     174              :             // now, and then set limits later.
     175              :             info!("initializing cgroup");
     176              : 
     177              :             let cgroup =
     178              :                 CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
     179              : 
     180              :             let init_value = cgroup::MemoryHistory {
     181              :                 avg_non_reclaimable: 0,
     182              :                 samples_count: 0,
     183              :                 samples_span: Duration::ZERO,
     184              :             };
     185              :             let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
     186              : 
     187            0 :             spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
     188            0 :                 cgroup.watch(hist_tx).await
     189            0 :             });
     190              : 
     191              :             let threshold = state.config.cgroup_threshold(mem);
     192              :             info!(threshold, "set initial cgroup threshold",);
     193              : 
     194              :             state.cgroup = Some(CgroupState {
     195              :                 watcher: hist_rx,
     196              :                 threshold,
     197              :             });
     198              :         }
     199              : 
     200              :         Ok(state)
     201              :     }
     202              : 
     203              :     /// Attempt to downscale filecache + cgroup
     204              :     #[tracing::instrument(skip_all, fields(?target))]
     205              :     pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
     206              :         // Nothing to adjust
     207              :         if self.cgroup.is_none() && self.filecache.is_none() {
     208              :             info!("no action needed for downscale (no cgroup or file cache enabled)");
     209              :             return Ok((
     210              :                 true,
     211              :                 "monitor is not managing cgroup or file cache".to_string(),
     212              :             ));
     213              :         }
     214              : 
     215              :         let requested_mem = target.mem;
     216              :         let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
     217              :         let expected_file_cache_size = self
     218              :             .filecache
     219              :             .as_ref()
     220            0 :             .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
     221              :             .unwrap_or(0);
     222              :         if let Some(cgroup) = &self.cgroup {
     223              :             let (last_time, last_history) = *cgroup.watcher.borrow();
     224              : 
     225              :             // NB: The ordering of these conditions is intentional. During startup, we should deny
     226              :             // downscaling until we have enough information to determine that it's safe to do so
     227              :             // (i.e. enough samples have come in). But if it's been a while and we *still* haven't
     228              :             // received any information, we should *fail* instead of just denying downscaling.
     229              :             //
     230              :             // `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
     231              :             // serves double-duty: it trips if we haven't received *any* metrics for long enough,
     232              :             // OR if we haven't received metrics *recently enough*.
     233              :             //
     234              :             // TODO: make the duration here configurable.
     235              :             if last_time.elapsed() > Duration::from_secs(5) {
     236              :                 bail!(
     237              :                     "haven't gotten cgroup memory stats recently enough to determine downscaling information"
     238              :                 );
     239              :             } else if last_history.samples_count <= 1 {
     240              :                 let status = "haven't received enough cgroup memory stats yet";
     241              :                 info!(status, "discontinuing downscale");
     242              :                 return Ok((false, status.to_owned()));
     243              :             }
     244              : 
     245              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     246              : 
     247              :             let current = last_history.avg_non_reclaimable;
     248              : 
     249              :             if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
     250              :                 let status = format!(
     251              :                     "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
     252              :                     "calculated memory threshold too low",
     253              :                     bytes_to_mebibytes(new_threshold),
     254              :                     bytes_to_mebibytes(current),
     255              :                     bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
     256              :                 );
     257              : 
     258              :                 info!(status, "discontinuing downscale");
     259              : 
     260              :                 return Ok((false, status));
     261              :             }
     262              :         }
     263              : 
     264              :         // The downscaling has been approved. Downscale the file cache, then the cgroup.
     265              :         let mut status = vec![];
     266              :         if let Some(file_cache) = &mut self.filecache {
     267              :             let actual_usage = file_cache
     268              :                 .set_file_cache_size(expected_file_cache_size)
     269              :                 .await
     270              :                 .context("failed to set file cache size")?;
     271              :             let message = format!(
     272              :                 "set file cache size to {} MiB",
     273              :                 bytes_to_mebibytes(actual_usage),
     274              :             );
     275              :             info!("downscale: {message}");
     276              :             status.push(message);
     277              :         }
     278              : 
     279              :         if let Some(cgroup) = &mut self.cgroup {
     280              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     281              : 
     282              :             let message = format!(
     283              :                 "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
     284              :                 bytes_to_mebibytes(cgroup.threshold),
     285              :                 bytes_to_mebibytes(new_threshold),
     286              :                 bytes_to_mebibytes(usable_system_memory)
     287              :             );
     288              :             cgroup.threshold = new_threshold;
     289              :             info!("downscale: {message}");
     290              :             status.push(message);
     291              :         }
     292              : 
     293              :         // TODO: make this status thing less jank
     294              :         let status = status.join("; ");
     295              :         Ok((true, status))
     296              :     }
     297              : 
     298              :     /// Handle new resources
     299              :     #[tracing::instrument(skip_all, fields(?resources))]
     300              :     pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
     301              :         if self.filecache.is_none() && self.cgroup.is_none() {
     302              :             info!("no action needed for upscale (no cgroup or file cache enabled)");
     303              :             return Ok(());
     304              :         }
     305              : 
     306              :         let new_mem = resources.mem;
     307              :         let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
     308              : 
     309              :         if let Some(file_cache) = &mut self.filecache {
     310              :             let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
     311              :             info!(
     312              :                 target = bytes_to_mebibytes(expected_usage),
     313              :                 total = bytes_to_mebibytes(new_mem),
     314              :                 "updating file cache size",
     315              :             );
     316              : 
     317              :             let actual_usage = file_cache
     318              :                 .set_file_cache_size(expected_usage)
     319              :                 .await
     320              :                 .context("failed to set file cache size")?;
     321              : 
     322              :             if actual_usage != expected_usage {
     323              :                 warn!(
     324              :                     "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
     325              :                     bytes_to_mebibytes(expected_usage),
     326              :                     bytes_to_mebibytes(actual_usage)
     327              :                 )
     328              :             }
     329              :         }
     330              : 
     331              :         if let Some(cgroup) = &mut self.cgroup {
     332              :             let new_threshold = self.config.cgroup_threshold(usable_system_memory);
     333              : 
     334              :             info!(
     335              :                 "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
     336              :                 bytes_to_mebibytes(cgroup.threshold),
     337              :                 bytes_to_mebibytes(new_threshold),
     338              :                 bytes_to_mebibytes(usable_system_memory)
     339              :             );
     340              :             cgroup.threshold = new_threshold;
     341              :         }
     342              : 
     343              :         Ok(())
     344              :     }
     345              : 
     346              :     /// Take in a message and perform some action, such as downscaling or upscaling,
     347              :     /// and return a message to be send back.
     348              :     #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
     349              :     pub async fn process_message(
     350              :         &mut self,
     351              :         InboundMsg { inner, id }: InboundMsg,
     352              :     ) -> anyhow::Result<Option<OutboundMsg>> {
     353              :         match inner {
     354              :             InboundMsgKind::UpscaleNotification { granted } => {
     355              :                 self.handle_upscale(granted)
     356              :                     .await
     357              :                     .context("failed to handle upscale")?;
     358              :                 Ok(Some(OutboundMsg::new(
     359              :                     OutboundMsgKind::UpscaleConfirmation {},
     360              :                     id,
     361              :                 )))
     362              :             }
     363              :             InboundMsgKind::DownscaleRequest { target } => self
     364              :                 .try_downscale(target)
     365              :                 .await
     366              :                 .context("failed to downscale")
     367            0 :                 .map(|(ok, status)| {
     368            0 :                     Some(OutboundMsg::new(
     369            0 :                         OutboundMsgKind::DownscaleResult { ok, status },
     370            0 :                         id,
     371            0 :                     ))
     372            0 :                 }),
     373              :             InboundMsgKind::InvalidMessage { error } => {
     374              :                 warn!(
     375              :                     error = format_args!("{error:#}"),
     376              :                     id, "received notification of an invalid message we sent"
     377              :                 );
     378              :                 Ok(None)
     379              :             }
     380              :             InboundMsgKind::InternalError { error } => {
     381              :                 warn!(
     382              :                     error = format_args!("{error:#}"),
     383              :                     id, "agent experienced an internal error"
     384              :                 );
     385              :                 Ok(None)
     386              :             }
     387              :             InboundMsgKind::HealthCheck {} => {
     388              :                 Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
     389              :             }
     390              :         }
     391              :     }
     392              : 
     393              :     // TODO: don't propagate errors, probably just warn!?
     394              :     #[tracing::instrument(skip_all)]
     395              :     pub async fn run(&mut self) -> anyhow::Result<()> {
     396              :         info!("starting dispatcher");
     397              :         loop {
     398              :             tokio::select! {
     399              :                 signal = self.kill.recv() => {
     400              :                     match signal {
     401              :                         Ok(()) => return Ok(()),
     402              :                         Err(e) => bail!("failed to receive kill signal: {e}")
     403              :                     }
     404              :                 }
     405              : 
     406              :                 // New memory stats from the cgroup, *may* need to request upscaling, if we've
     407              :                 // exceeded the threshold
     408              :                 result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
     409              :                     result.context("failed to receive from cgroup memory stats watcher")?;
     410              : 
     411              :                     let cgroup = self.cgroup.as_ref().unwrap();
     412              : 
     413              :                     let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
     414              : 
     415              :                     // If we haven't exceeded the threshold, then we're all ok
     416              :                     if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
     417              :                         continue;
     418              :                     }
     419              : 
     420              :                     // Otherwise, we generally want upscaling. But, if it's been less than 1 second
     421              :                     // since the last time we requested upscaling, ignore the event, to avoid
     422              :                     // spamming the agent.
     423              :                     if let Some(t) = self.last_upscale_request_at {
     424              :                         let elapsed = t.elapsed();
     425              :                         if elapsed < Duration::from_secs(1) {
     426              :                             // *Ideally* we'd like to log here that we're ignoring the fact the
     427              :                             // memory stats are too high, but in practice this can result in
     428              :                             // spamming the logs with repetitive messages about ignoring the signal
     429              :                             //
     430              :                             // See https://github.com/neondatabase/neon/issues/5865 for more.
     431              :                             continue;
     432              :                         }
     433              :                     }
     434              : 
     435              :                     self.last_upscale_request_at = Some(Instant::now());
     436              : 
     437              :                     info!(
     438              :                         avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
     439              :                         threshold = bytes_to_mebibytes(cgroup.threshold),
     440              :                         "cgroup memory stats are high enough to upscale, requesting upscale",
     441              :                     );
     442              : 
     443              :                     self.counter += 2; // Increment, preserving parity (i.e. keep the
     444              :                                        // counter odd). See the field comment for more.
     445              :                     self.dispatcher
     446              :                         .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
     447              :                         .await
     448              :                         .context("failed to send message")?;
     449              :                 },
     450              : 
     451              :                 // there is a message from the agent
     452              :                 msg = self.dispatcher.source.next() => {
     453              :                     if let Some(msg) = msg {
     454              :                         match &msg {
     455              :                             Ok(msg) => {
     456              :                                 let message: InboundMsg = match msg {
     457              :                                     Message::Text(text) => {
     458              :                                         serde_json::from_str(text).context("failed to deserialize text message")?
     459              :                                     }
     460              :                                     other => {
     461              :                                         warn!(
     462              :                                             // Don't use 'message' as a key as the
     463              :                                             // string also uses that for its key
     464              :                                             msg = ?other,
     465              :                                             "problem processing incoming message: agent should only send text messages but received different type"
     466              :                                         );
     467              :                                         continue
     468              :                                     },
     469              :                                 };
     470              : 
     471              :                                 if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
     472              :                                     debug!(?msg, "received message");
     473              :                                 } else {
     474              :                                     info!(?msg, "received message");
     475              :                                 }
     476              : 
     477              :                                 let out = match self.process_message(message.clone()).await {
     478              :                                     Ok(Some(out)) => out,
     479              :                                     Ok(None) => continue,
     480              :                                     Err(e) => {
     481              :                                         // use {:#} for our logging because the display impl only
     482              :                                         // gives the outermost cause, and the debug impl
     483              :                                         // pretty-prints the error, whereas {:#} contains all the
     484              :                                         // causes, but is compact (no newlines).
     485              :                                         warn!(error = format_args!("{e:#}"), "error handling message");
     486              :                                         OutboundMsg::new(
     487              :                                             OutboundMsgKind::InternalError {
     488              :                                                 error: e.to_string(),
     489              :                                             },
     490              :                                             message.id
     491              :                                         )
     492              :                                     }
     493              :                                 };
     494              : 
     495              :                                 self.dispatcher
     496              :                                     .send(out)
     497              :                                     .await
     498              :                                     .context("failed to send message")?;
     499              :                             }
     500              :                             Err(e) => warn!(
     501              :                                 error = format_args!("{e:#}"),
     502              :                                 msg = ?msg,
     503              :                                 "received error message"
     504              :                             ),
     505              :                         }
     506              :                     } else {
     507              :                         anyhow::bail!("dispatcher connection closed")
     508              :                     }
     509              :                 }
     510              :             }
     511              :         }
     512              :     }
     513              : }
        

Generated by: LCOV version 2.1-beta