LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - runner.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 0.0 % 41 0
Test Date: 2024-09-24 13:57:57 Functions: 0.0 % 11 0

            Line data    Source code
       1              : //! Exposes the `Runner`, which handles messages received from agent and
       2              : //! sends upscale requests.
       3              : //!
       4              : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
       5              : //! all functionality.
       6              : 
       7              : use std::fmt::Debug;
       8              : use std::time::{Duration, Instant};
       9              : 
      10              : use anyhow::{bail, Context};
      11              : use axum::extract::ws::{Message, WebSocket};
      12              : use futures::StreamExt;
      13              : use tokio::sync::{broadcast, watch};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::{debug, error, info, warn};
      16              : 
      17              : use crate::cgroup::{self, CgroupWatcher};
      18              : use crate::dispatcher::Dispatcher;
      19              : use crate::filecache::{FileCacheConfig, FileCacheState};
      20              : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
      21              : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
      22              : 
      23              : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
      24              : /// signals from the agent.
      25              : #[derive(Debug)]
      26              : pub struct Runner {
      27              :     config: Config,
      28              :     filecache: Option<FileCacheState>,
      29              :     cgroup: Option<CgroupState>,
      30              :     dispatcher: Dispatcher,
      31              : 
      32              :     /// We "mint" new message ids by incrementing this counter and taking the value.
      33              :     ///
      34              :     /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
      35              :     /// by us vs the autoscaler-agent.
      36              :     counter: usize,
      37              : 
      38              :     last_upscale_request_at: Option<Instant>,
      39              : 
      40              :     /// A signal to kill the main thread produced by `self.run()`. This is triggered
      41              :     /// when the server receives a new connection. When the thread receives the
      42              :     /// signal off this channel, it will gracefully shutdown.
      43              :     kill: broadcast::Receiver<()>,
      44              : }
      45              : 
      46              : #[derive(Debug)]
      47              : struct CgroupState {
      48              :     watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
      49              :     /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
      50              :     /// requests.
      51              :     threshold: u64,
      52              : }
      53              : 
      54              : /// Configuration for a `Runner`
      55              : #[derive(Debug)]
      56              : pub struct Config {
      57              :     /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
      58              :     /// handing out the rest to userspace. This value is the estimated difference between the
      59              :     /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
      60              :     ///
      61              :     /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
      62              :     /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
      63              :     ///
      64              :     /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
      65              :     /// size, rather than the self-reported memory size, according to the kernel.
      66              :     ///
      67              :     /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
      68              :     /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
      69              :     /// should be removed once we have a better solution there.
      70              :     sys_buffer_bytes: u64,
      71              : 
      72              :     /// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
      73              :     /// other words, providing a ceiling for the highest value of the threshold by enforcing that
      74              :     /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
      75              :     /// threshold.
      76              :     ///
      77              :     /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
      78              :     /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
      79              :     /// memory.
      80              :     ///
      81              :     /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
      82              :     /// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
      83              :     /// memory for the file cache).
      84              :     cgroup_min_overhead_fraction: f64,
      85              : 
      86              :     cgroup_downscale_threshold_buffer_bytes: u64,
      87              : }
      88              : 
      89              : impl Default for Config {
      90            0 :     fn default() -> Self {
      91            0 :         Self {
      92            0 :             sys_buffer_bytes: 100 * MiB,
      93            0 :             cgroup_min_overhead_fraction: 0.15,
      94            0 :             cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
      95            0 :         }
      96            0 :     }
      97              : }
      98              : 
      99              : impl Config {
     100            0 :     fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
     101            0 :         // If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
     102            0 :         // and thus be non-reclaimable, so we should allow for additional memory usage.
     103            0 :         //
     104            0 :         // If the file cache sits on disk, our desired stable system state is for it to be fully
     105            0 :         // page cached (its contents should only be paged to/from disk in situations where we can't
     106            0 :         // upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
     107            0 :         // threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
     108            0 :         // out the file cache.
     109            0 :         let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
     110            0 : 
     111            0 :         // Even if we're not separately making room for the file cache (if it's in tmpfs), we still
     112            0 :         // want our threshold to be met gracefully instead of letting postgres get OOM-killed.
     113            0 :         // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
     114            0 :         // remaining above the threshold.
     115            0 :         let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
     116            0 : 
     117            0 :         memory_remaining_for_cgroup.min(max_threshold)
     118            0 :     }
     119              : }
     120              : 
     121              : impl Runner {
     122              :     /// Create a new monitor.
     123            0 :     #[tracing::instrument(skip_all, fields(?config, ?args))]
     124              :     pub async fn new(
     125              :         config: Config,
     126              :         args: &Args,
     127              :         ws: WebSocket,
     128              :         kill: broadcast::Receiver<()>,
     129              :         token: CancellationToken,
     130              :     ) -> anyhow::Result<Runner> {
     131              :         anyhow::ensure!(
     132              :             config.sys_buffer_bytes != 0,
     133              :             "invalid monitor Config: sys_buffer_bytes cannot be 0"
     134              :         );
     135              : 
     136              :         let dispatcher = Dispatcher::new(ws)
     137              :             .await
     138              :             .context("error creating new dispatcher")?;
     139              : 
     140              :         let mut state = Runner {
     141              :             config,
     142              :             filecache: None,
     143              :             cgroup: None,
     144              :             dispatcher,
     145              :             counter: 1, // NB: must be odd, see the comment about the field for more.
     146              :             last_upscale_request_at: None,
     147              :             kill,
     148              :         };
     149              : 
     150              :         let mem = get_total_system_memory();
     151              : 
     152              :         let mut file_cache_disk_size = 0;
     153              : 
     154              :         // We need to process file cache initialization before cgroup initialization, so that the memory
     155              :         // allocated to the file cache is appropriately taken into account when we decide the cgroup's
     156              :         // memory limits.
     157              :         if let Some(connstr) = &args.pgconnstr {
     158              :             info!("initializing file cache");
     159              :             let config = FileCacheConfig::default();
     160              : 
     161              :             let mut file_cache = FileCacheState::new(connstr, config, token.clone())
     162              :                 .await
     163              :                 .context("failed to create file cache")?;
     164              : 
     165              :             let size = file_cache
     166              :                 .get_file_cache_size()
     167              :                 .await
     168              :                 .context("error getting file cache size")?;
     169              : 
     170              :             let new_size = file_cache.config.calculate_cache_size(mem);
     171              :             info!(
     172              :                 initial = bytes_to_mebibytes(size),
     173              :                 new = bytes_to_mebibytes(new_size),
     174              :                 "setting initial file cache size",
     175              :             );
     176              : 
     177              :             // note: even if size == new_size, we want to explicitly set it, just
     178              :             // to make sure that we have the permissions to do so
     179              :             let actual_size = file_cache
     180              :                 .set_file_cache_size(new_size)
     181              :                 .await
     182              :                 .context("failed to set file cache size, possibly due to inadequate permissions")?;
     183              :             if actual_size != new_size {
     184              :                 info!("file cache size actually got set to {actual_size}")
     185              :             }
     186              : 
     187              :             file_cache_disk_size = actual_size;
     188              :             state.filecache = Some(file_cache);
     189              :         }
     190              : 
     191              :         if let Some(name) = &args.cgroup {
     192              :             // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
     193              :             // now, and then set limits later.
     194              :             info!("initializing cgroup");
     195              : 
     196              :             let cgroup =
     197              :                 CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
     198              : 
     199              :             let init_value = cgroup::MemoryHistory {
     200              :                 avg_non_reclaimable: 0,
     201              :                 samples_count: 0,
     202              :                 samples_span: Duration::ZERO,
     203              :             };
     204              :             let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
     205              : 
     206            0 :             spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
     207            0 :                 cgroup.watch(hist_tx).await
     208            0 :             });
     209              : 
     210              :             let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
     211              :             info!(threshold, "set initial cgroup threshold",);
     212              : 
     213              :             state.cgroup = Some(CgroupState {
     214              :                 watcher: hist_rx,
     215              :                 threshold,
     216              :             });
     217              :         }
     218              : 
     219              :         Ok(state)
     220              :     }
     221              : 
     222              :     /// Attempt to downscale filecache + cgroup
     223            0 :     #[tracing::instrument(skip_all, fields(?target))]
     224              :     pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
     225              :         // Nothing to adjust
     226              :         if self.cgroup.is_none() && self.filecache.is_none() {
     227              :             info!("no action needed for downscale (no cgroup or file cache enabled)");
     228              :             return Ok((
     229              :                 true,
     230              :                 "monitor is not managing cgroup or file cache".to_string(),
     231              :             ));
     232              :         }
     233              : 
     234              :         let requested_mem = target.mem;
     235              :         let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
     236              :         let expected_file_cache_size = self
     237              :             .filecache
     238              :             .as_ref()
     239            0 :             .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
     240              :             .unwrap_or(0);
     241              :         if let Some(cgroup) = &self.cgroup {
     242              :             let (last_time, last_history) = *cgroup.watcher.borrow();
     243              : 
     244              :             // NB: The ordering of these conditions is intentional. During startup, we should deny
     245              :             // downscaling until we have enough information to determine that it's safe to do so
     246              :             // (i.e. enough samples have come in). But if it's been a while and we *still* haven't
     247              :             // received any information, we should *fail* instead of just denying downscaling.
     248              :             //
     249              :             // `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
     250              :             // serves double-duty: it trips if we haven't received *any* metrics for long enough,
     251              :             // OR if we haven't received metrics *recently enough*.
     252              :             //
     253              :             // TODO: make the duration here configurable.
     254              :             if last_time.elapsed() > Duration::from_secs(5) {
     255              :                 bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
     256              :             } else if last_history.samples_count <= 1 {
     257              :                 let status = "haven't received enough cgroup memory stats yet";
     258              :                 info!(status, "discontinuing downscale");
     259              :                 return Ok((false, status.to_owned()));
     260              :             }
     261              : 
     262              :             let new_threshold = self
     263              :                 .config
     264              :                 .cgroup_threshold(usable_system_memory, expected_file_cache_size);
     265              : 
     266              :             let current = last_history.avg_non_reclaimable;
     267              : 
     268              :             if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
     269              :                 let status = format!(
     270              :                     "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
     271              :                     "calculated memory threshold too low",
     272              :                     bytes_to_mebibytes(new_threshold),
     273              :                     bytes_to_mebibytes(current),
     274              :                     bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
     275              :                 );
     276              : 
     277              :                 info!(status, "discontinuing downscale");
     278              : 
     279              :                 return Ok((false, status));
     280              :             }
     281              :         }
     282              : 
     283              :         // The downscaling has been approved. Downscale the file cache, then the cgroup.
     284              :         let mut status = vec![];
     285              :         let mut file_cache_disk_size = 0;
     286              :         if let Some(file_cache) = &mut self.filecache {
     287              :             let actual_usage = file_cache
     288              :                 .set_file_cache_size(expected_file_cache_size)
     289              :                 .await
     290              :                 .context("failed to set file cache size")?;
     291              :             file_cache_disk_size = actual_usage;
     292              :             let message = format!(
     293              :                 "set file cache size to {} MiB",
     294              :                 bytes_to_mebibytes(actual_usage),
     295              :             );
     296              :             info!("downscale: {message}");
     297              :             status.push(message);
     298              :         }
     299              : 
     300              :         if let Some(cgroup) = &mut self.cgroup {
     301              :             let new_threshold = self
     302              :                 .config
     303              :                 .cgroup_threshold(usable_system_memory, file_cache_disk_size);
     304              : 
     305              :             let message = format!(
     306              :                 "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
     307              :                 bytes_to_mebibytes(cgroup.threshold),
     308              :                 bytes_to_mebibytes(new_threshold),
     309              :                 bytes_to_mebibytes(usable_system_memory)
     310              :             );
     311              :             cgroup.threshold = new_threshold;
     312              :             info!("downscale: {message}");
     313              :             status.push(message);
     314              :         }
     315              : 
     316              :         // TODO: make this status thing less jank
     317              :         let status = status.join("; ");
     318              :         Ok((true, status))
     319              :     }
     320              : 
     321              :     /// Handle new resources
     322            0 :     #[tracing::instrument(skip_all, fields(?resources))]
     323              :     pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
     324              :         if self.filecache.is_none() && self.cgroup.is_none() {
     325              :             info!("no action needed for upscale (no cgroup or file cache enabled)");
     326              :             return Ok(());
     327              :         }
     328              : 
     329              :         let new_mem = resources.mem;
     330              :         let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
     331              : 
     332              :         let mut file_cache_disk_size = 0;
     333              :         if let Some(file_cache) = &mut self.filecache {
     334              :             let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
     335              :             info!(
     336              :                 target = bytes_to_mebibytes(expected_usage),
     337              :                 total = bytes_to_mebibytes(new_mem),
     338              :                 "updating file cache size",
     339              :             );
     340              : 
     341              :             let actual_usage = file_cache
     342              :                 .set_file_cache_size(expected_usage)
     343              :                 .await
     344              :                 .context("failed to set file cache size")?;
     345              :             file_cache_disk_size = actual_usage;
     346              : 
     347              :             if actual_usage != expected_usage {
     348              :                 warn!(
     349              :                     "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
     350              :                     bytes_to_mebibytes(expected_usage),
     351              :                     bytes_to_mebibytes(actual_usage)
     352              :                 )
     353              :             }
     354              :         }
     355              : 
     356              :         if let Some(cgroup) = &mut self.cgroup {
     357              :             let new_threshold = self
     358              :                 .config
     359              :                 .cgroup_threshold(usable_system_memory, file_cache_disk_size);
     360              : 
     361              :             info!(
     362              :                 "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
     363              :                 bytes_to_mebibytes(cgroup.threshold),
     364              :                 bytes_to_mebibytes(new_threshold),
     365              :                 bytes_to_mebibytes(usable_system_memory)
     366              :             );
     367              :             cgroup.threshold = new_threshold;
     368              :         }
     369              : 
     370              :         Ok(())
     371              :     }
     372              : 
     373              :     /// Take in a message and perform some action, such as downscaling or upscaling,
     374              :     /// and return a message to be send back.
     375            0 :     #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
     376              :     pub async fn process_message(
     377              :         &mut self,
     378              :         InboundMsg { inner, id }: InboundMsg,
     379              :     ) -> anyhow::Result<Option<OutboundMsg>> {
     380              :         match inner {
     381              :             InboundMsgKind::UpscaleNotification { granted } => {
     382              :                 self.handle_upscale(granted)
     383              :                     .await
     384              :                     .context("failed to handle upscale")?;
     385              :                 Ok(Some(OutboundMsg::new(
     386              :                     OutboundMsgKind::UpscaleConfirmation {},
     387              :                     id,
     388              :                 )))
     389              :             }
     390              :             InboundMsgKind::DownscaleRequest { target } => self
     391              :                 .try_downscale(target)
     392              :                 .await
     393              :                 .context("failed to downscale")
     394            0 :                 .map(|(ok, status)| {
     395            0 :                     Some(OutboundMsg::new(
     396            0 :                         OutboundMsgKind::DownscaleResult { ok, status },
     397            0 :                         id,
     398            0 :                     ))
     399            0 :                 }),
     400              :             InboundMsgKind::InvalidMessage { error } => {
     401              :                 warn!(
     402              :                     %error, id, "received notification of an invalid message we sent"
     403              :                 );
     404              :                 Ok(None)
     405              :             }
     406              :             InboundMsgKind::InternalError { error } => {
     407              :                 warn!(error, id, "agent experienced an internal error");
     408              :                 Ok(None)
     409              :             }
     410              :             InboundMsgKind::HealthCheck {} => {
     411              :                 Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
     412              :             }
     413              :         }
     414              :     }
     415              : 
     416              :     // TODO: don't propagate errors, probably just warn!?
     417            0 :     #[tracing::instrument(skip_all)]
     418              :     pub async fn run(&mut self) -> anyhow::Result<()> {
     419              :         info!("starting dispatcher");
     420              :         loop {
     421              :             tokio::select! {
     422              :                 signal = self.kill.recv() => {
     423              :                     match signal {
     424              :                         Ok(()) => return Ok(()),
     425              :                         Err(e) => bail!("failed to receive kill signal: {e}")
     426              :                     }
     427              :                 }
     428              : 
     429              :                 // New memory stats from the cgroup, *may* need to request upscaling, if we've
     430              :                 // exceeded the threshold
     431              :                 result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
     432              :                     result.context("failed to receive from cgroup memory stats watcher")?;
     433              : 
     434              :                     let cgroup = self.cgroup.as_ref().unwrap();
     435              : 
     436              :                     let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
     437              : 
     438              :                     // If we haven't exceeded the threshold, then we're all ok
     439              :                     if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
     440              :                         continue;
     441              :                     }
     442              : 
     443              :                     // Otherwise, we generally want upscaling. But, if it's been less than 1 second
     444              :                     // since the last time we requested upscaling, ignore the event, to avoid
     445              :                     // spamming the agent.
     446              :                     if let Some(t) = self.last_upscale_request_at {
     447              :                         let elapsed = t.elapsed();
     448              :                         if elapsed < Duration::from_secs(1) {
     449              :                             // *Ideally* we'd like to log here that we're ignoring the fact the
     450              :                             // memory stats are too high, but in practice this can result in
     451              :                             // spamming the logs with repetitive messages about ignoring the signal
     452              :                             //
     453              :                             // See https://github.com/neondatabase/neon/issues/5865 for more.
     454              :                             continue;
     455              :                         }
     456              :                     }
     457              : 
     458              :                     self.last_upscale_request_at = Some(Instant::now());
     459              : 
     460              :                     info!(
     461              :                         avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
     462              :                         threshold = bytes_to_mebibytes(cgroup.threshold),
     463              :                         "cgroup memory stats are high enough to upscale, requesting upscale",
     464              :                     );
     465              : 
     466              :                     self.counter += 2; // Increment, preserving parity (i.e. keep the
     467              :                                        // counter odd). See the field comment for more.
     468              :                     self.dispatcher
     469              :                         .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
     470              :                         .await
     471              :                         .context("failed to send message")?;
     472              :                 },
     473              : 
     474              :                 // there is a message from the agent
     475              :                 msg = self.dispatcher.source.next() => {
     476              :                     if let Some(msg) = msg {
     477              :                         match &msg {
     478              :                             Ok(msg) => {
     479              :                                 let message: InboundMsg = match msg {
     480              :                                     Message::Text(text) => {
     481              :                                         serde_json::from_str(text).context("failed to deserialize text message")?
     482              :                                     }
     483              :                                     other => {
     484              :                                         warn!(
     485              :                                             // Don't use 'message' as a key as the
     486              :                                             // string also uses that for its key
     487              :                                             msg = ?other,
     488              :                                             "problem processing incoming message: agent should only send text messages but received different type"
     489              :                                         );
     490              :                                         continue
     491              :                                     },
     492              :                                 };
     493              : 
     494              :                                 if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
     495              :                                     debug!(?msg, "received message");
     496              :                                 } else {
     497              :                                     info!(?msg, "received message");
     498              :                                 }
     499              : 
     500              :                                 let out = match self.process_message(message.clone()).await {
     501              :                                     Ok(Some(out)) => out,
     502              :                                     Ok(None) => continue,
     503              :                                     Err(e) => {
     504              :                                         // use {:#} for our logging because the display impl only
     505              :                                         // gives the outermost cause, and the debug impl
     506              :                                         // pretty-prints the error, whereas {:#} contains all the
     507              :                                         // causes, but is compact (no newlines).
     508              :                                         warn!(error = format!("{e:#}"), "error handling message");
     509              :                                         OutboundMsg::new(
     510              :                                             OutboundMsgKind::InternalError {
     511              :                                                 error: e.to_string(),
     512              :                                             },
     513              :                                             message.id
     514              :                                         )
     515              :                                     }
     516              :                                 };
     517              : 
     518              :                                 self.dispatcher
     519              :                                     .send(out)
     520              :                                     .await
     521              :                                     .context("failed to send message")?;
     522              :                             }
     523              :                             Err(e) => warn!(
     524              :                                 error = format!("{e}"),
     525              :                                 msg = ?msg,
     526              :                                 "received error message"
     527              :                             ),
     528              :                         }
     529              :                     } else {
     530              :                         anyhow::bail!("dispatcher connection closed")
     531              :                     }
     532              :                 }
     533              :             }
     534              :         }
     535              :     }
     536              : }
        

Generated by: LCOV version 2.1-beta