LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - cgroup.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 62.9 % 143 90
Test Date: 2024-11-25 17:48:16 Functions: 22.7 % 22 5

            Line data    Source code
       1              : use std::fmt::{self, Debug, Formatter};
       2              : use std::time::{Duration, Instant};
       3              : 
       4              : use anyhow::{anyhow, Context};
       5              : use cgroups_rs::{
       6              :     hierarchies::{self, is_cgroup2_unified_mode},
       7              :     memory::MemController,
       8              :     Subsystem,
       9              : };
      10              : use tokio::sync::watch;
      11              : use tracing::{info, warn};
      12              : 
      13              : /// Configuration for a `CgroupWatcher`
      14              : #[derive(Debug, Clone)]
      15              : pub struct Config {
      16              :     /// Interval at which we should be fetching memory statistics
      17              :     memory_poll_interval: Duration,
      18              : 
      19              :     /// The number of samples used in constructing aggregated memory statistics
      20              :     memory_history_len: usize,
      21              :     /// The number of most recent samples that will be periodically logged.
      22              :     ///
      23              :     /// Each sample is logged exactly once. Increasing this value means that recent samples will be
      24              :     /// logged less frequently, and vice versa.
      25              :     ///
      26              :     /// For simplicity, this value must be greater than or equal to `memory_history_len`.
      27              :     memory_history_log_interval: usize,
      28              :     /// The max number of iterations to skip before logging the next iteration
      29              :     memory_history_log_noskip_interval: Duration,
      30              : }
      31              : 
      32              : impl Default for Config {
      33            0 :     fn default() -> Self {
      34            0 :         Self {
      35            0 :             memory_poll_interval: Duration::from_millis(100),
      36            0 :             memory_history_len: 5, // use 500ms of history for decision-making
      37            0 :             memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
      38            0 :             memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
      39            0 :         }
      40            0 :     }
      41              : }
      42              : 
      43              : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
      44              : /// OOM killed or throttling.
      45              : ///
      46              : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
      47              : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
      48              : /// cgroup happy.
      49              : #[derive(Debug)]
      50              : pub struct CgroupWatcher {
      51              :     pub config: Config,
      52              : 
      53              :     /// The actual cgroup we are watching and managing.
      54              :     cgroup: cgroups_rs::Cgroup,
      55              : }
      56              : 
      57              : impl CgroupWatcher {
      58              :     /// Create a new `CgroupWatcher`.
      59            0 :     #[tracing::instrument(skip_all, fields(%name))]
      60              :     pub fn new(name: String) -> anyhow::Result<Self> {
      61              :         // TODO: clarify exactly why we need v2
      62              :         // Make sure cgroups v2 (aka unified) are supported
      63              :         if !is_cgroup2_unified_mode() {
      64              :             anyhow::bail!("cgroups v2 not supported");
      65              :         }
      66              :         let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
      67              : 
      68              :         Ok(Self {
      69              :             cgroup,
      70              :             config: Default::default(),
      71              :         })
      72              :     }
      73              : 
      74              :     /// The entrypoint for the `CgroupWatcher`.
      75            0 :     #[tracing::instrument(skip_all)]
      76              :     pub async fn watch(
      77              :         &self,
      78              :         updates: watch::Sender<(Instant, MemoryHistory)>,
      79              :     ) -> anyhow::Result<()> {
      80              :         // this requirement makes the code a bit easier to work with; see the config for more.
      81              :         assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
      82              : 
      83              :         let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
      84              :         ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
      85              :         // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
      86              : 
      87              :         let mem_controller = self.memory()?;
      88              : 
      89              :         // buffer for samples that will be logged. once full, it remains so.
      90              :         let history_log_len = self.config.memory_history_log_interval;
      91              :         let max_skip = self.config.memory_history_log_noskip_interval;
      92              :         let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
      93              :         let mut last_logged_memusage = MemoryStatus::zeroed();
      94              : 
      95              :         // Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
      96              :         let mut can_skip_logs_until = Instant::now() - max_skip;
      97              : 
      98              :         for t in 0_u64.. {
      99              :             ticker.tick().await;
     100              : 
     101              :             let now = Instant::now();
     102              :             let mem = Self::memory_usage(mem_controller);
     103              : 
     104              :             let i = t as usize % history_log_len;
     105              :             history_log_buf[i] = mem;
     106              : 
     107              :             // We're taking *at most* memory_history_len values; we may be bounded by the total
     108              :             // number of samples that have come in so far.
     109              :             let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
     110              :             // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
     111              :             // that we just inserted a value there, so the end of the iterator will *include* the
     112              :             // value at i, rather than stopping just short of it.
     113              :             let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
     114              : 
     115              :             let summary = MemoryHistory {
     116            0 :                 avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
     117              :                     / samples_count as u64,
     118              :                 samples_count,
     119              :                 samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
     120              :             };
     121              : 
     122              :             // Log the current history if it's time to do so. Because `history_log_buf` has length
     123              :             // equal to the logging interval, we can just log the entire buffer every time we set
     124              :             // the last entry, which also means that for this log line, we can ignore that it's a
     125              :             // ring buffer (because all the entries are in order of increasing time).
     126              :             //
     127              :             // We skip logging the data if data hasn't meaningfully changed in a while, unless
     128              :             // we've already ignored previous iterations for the last max_skip period.
     129              :             if i == history_log_len - 1
     130              :                 && (now > can_skip_logs_until
     131              :                     || !history_log_buf
     132              :                         .iter()
     133            0 :                         .all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
     134              :             {
     135              :                 info!(
     136              :                     history = ?MemoryStatus::debug_slice(&history_log_buf),
     137              :                     summary = ?summary,
     138              :                     "Recent cgroup memory statistics history"
     139              :                 );
     140              : 
     141              :                 can_skip_logs_until = now + max_skip;
     142              : 
     143              :                 last_logged_memusage = *history_log_buf.last().unwrap();
     144              :             }
     145              : 
     146              :             updates
     147              :                 .send((now, summary))
     148              :                 .context("failed to send MemoryHistory")?;
     149              :         }
     150              : 
     151              :         unreachable!()
     152              :     }
     153              : 
     154              :     /// Get a handle on the memory subsystem.
     155            0 :     fn memory(&self) -> anyhow::Result<&MemController> {
     156            0 :         self.cgroup
     157            0 :             .subsystems()
     158            0 :             .iter()
     159            0 :             .find_map(|sub| match sub {
     160            0 :                 Subsystem::Mem(c) => Some(c),
     161            0 :                 _ => None,
     162            0 :             })
     163            0 :             .ok_or_else(|| anyhow!("could not find memory subsystem"))
     164            0 :     }
     165              : 
     166              :     /// Given a handle on the memory subsystem, returns the current memory information
     167            0 :     fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
     168            0 :         let stat = mem_controller.memory_stat().stat;
     169            0 :         MemoryStatus {
     170            0 :             non_reclaimable: stat.active_anon + stat.inactive_anon,
     171            0 :         }
     172            0 :     }
     173              : }
     174              : 
     175              : // Helper function for `CgroupWatcher::watch`
     176            9 : fn ring_buf_recent_values_iter<T>(
     177            9 :     buf: &[T],
     178            9 :     last_value_idx: usize,
     179            9 :     count: usize,
     180            9 : ) -> impl '_ + Iterator<Item = &T> {
     181            9 :     // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
     182            9 :     // easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
     183            9 :     assert!(count <= buf.len());
     184              : 
     185            9 :     buf.iter()
     186            9 :         // 'cycle' because the values could wrap around
     187            9 :         .cycle()
     188            9 :         // with 'cycle', this skip is more like 'offset', and functionally this is
     189            9 :         // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
     190            9 :         // careful to avoid underflow, so we pre-add buf.len().
     191            9 :         // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
     192            9 :         .skip((buf.len() + last_value_idx + 1 - count) % buf.len())
     193            9 :         .take(count)
     194            9 : }
     195              : 
     196              : /// Summary of recent memory usage
     197              : #[derive(Debug, Copy, Clone)]
     198              : pub struct MemoryHistory {
     199              :     /// Rolling average of non-reclaimable memory usage samples over the last `history_period`
     200              :     pub avg_non_reclaimable: u64,
     201              : 
     202              :     /// The number of samples used to construct this summary
     203              :     pub samples_count: usize,
     204              :     /// Total timespan between the first and last sample used for this summary
     205              :     pub samples_span: Duration,
     206              : }
     207              : 
     208              : #[derive(Debug, Copy, Clone)]
     209              : pub struct MemoryStatus {
     210              :     non_reclaimable: u64,
     211              : }
     212              : 
     213              : impl MemoryStatus {
     214            0 :     fn zeroed() -> Self {
     215            0 :         MemoryStatus { non_reclaimable: 0 }
     216            0 :     }
     217              : 
     218            0 :     fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
     219              :         struct DS<'a>(&'a [MemoryStatus]);
     220              : 
     221              :         impl Debug for DS<'_> {
     222            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     223            0 :                 f.debug_struct("[MemoryStatus]")
     224            0 :                     .field(
     225            0 :                         "non_reclaimable[..]",
     226            0 :                         &Fields(self.0, |stat: &MemoryStatus| {
     227            0 :                             BytesToGB(stat.non_reclaimable)
     228            0 :                         }),
     229            0 :                     )
     230            0 :                     .finish()
     231            0 :             }
     232              :         }
     233              : 
     234              :         struct Fields<'a, F>(&'a [MemoryStatus], F);
     235              : 
     236              :         impl<F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'_, F> {
     237            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     238            0 :                 f.debug_list().entries(self.0.iter().map(&self.1)).finish()
     239            0 :             }
     240              :         }
     241              : 
     242              :         struct BytesToGB(u64);
     243              : 
     244              :         impl Debug for BytesToGB {
     245            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     246            0 :                 f.write_fmt(format_args!(
     247            0 :                     "{:.3}Gi",
     248            0 :                     self.0 as f64 / (1_u64 << 30) as f64
     249            0 :                 ))
     250            0 :             }
     251              :         }
     252              : 
     253            0 :         DS(slice)
     254            0 :     }
     255              : 
     256              :     /// Check if the other memory status is a close or similar result.
     257              :     /// Returns true if the larger value is not larger than the smaller value
     258              :     /// by 1/8 of the smaller value, and within 128MiB.
     259              :     /// See tests::check_similarity_behaviour for examples of behaviour
     260           21 :     fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
     261           21 :         let margin;
     262           21 :         let diff;
     263           21 :         if self.non_reclaimable >= other.non_reclaimable {
     264           15 :             margin = other.non_reclaimable / 8;
     265           15 :             diff = self.non_reclaimable - other.non_reclaimable;
     266           15 :         } else {
     267            6 :             margin = self.non_reclaimable / 8;
     268            6 :             diff = other.non_reclaimable - self.non_reclaimable;
     269            6 :         }
     270              : 
     271           21 :         diff < margin && diff < 128 * 1024 * 1024
     272           21 :     }
     273              : }
     274              : 
     275              : #[cfg(test)]
     276              : mod tests {
     277              :     #[test]
     278            1 :     fn ring_buf_iter() {
     279            1 :         let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
     280            1 : 
     281            9 :         let values = |offset, count| {
     282            9 :             super::ring_buf_recent_values_iter(&buf, offset, count)
     283            9 :                 .copied()
     284            9 :                 .collect::<Vec<i32>>()
     285            9 :         };
     286              : 
     287              :         // Boundary conditions: start, end, and entire thing:
     288            1 :         assert_eq!(values(0, 1), [0]);
     289            1 :         assert_eq!(values(3, 4), [0, 1, 2, 3]);
     290            1 :         assert_eq!(values(9, 4), [6, 7, 8, 9]);
     291            1 :         assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
     292              : 
     293              :         // "normal" operation: no wraparound
     294            1 :         assert_eq!(values(7, 4), [4, 5, 6, 7]);
     295              : 
     296              :         // wraparound:
     297            1 :         assert_eq!(values(0, 4), [7, 8, 9, 0]);
     298            1 :         assert_eq!(values(1, 4), [8, 9, 0, 1]);
     299            1 :         assert_eq!(values(2, 4), [9, 0, 1, 2]);
     300            1 :         assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
     301            1 :     }
     302              : 
     303              :     #[test]
     304            1 :     fn check_similarity_behaviour() {
     305            1 :         // This all accesses private methods, so we can't actually run this
     306            1 :         // as doctests, because doctests run as an external crate.
     307            1 :         let mut small = super::MemoryStatus {
     308            1 :             non_reclaimable: 1024,
     309            1 :         };
     310            1 :         let mut large = super::MemoryStatus {
     311            1 :             non_reclaimable: 1024 * 1024 * 1024 * 1024,
     312            1 :         };
     313            1 : 
     314            1 :         // objects are self-similar, no matter the size
     315            1 :         assert!(small.status_is_close_or_similar(&small));
     316            1 :         assert!(large.status_is_close_or_similar(&large));
     317              : 
     318              :         // inequality is symmetric
     319            1 :         assert!(!small.status_is_close_or_similar(&large));
     320            1 :         assert!(!large.status_is_close_or_similar(&small));
     321              : 
     322            1 :         small.non_reclaimable = 64;
     323            1 :         large.non_reclaimable = (small.non_reclaimable / 8) * 9;
     324            1 : 
     325            1 :         // objects are self-similar, no matter the size
     326            1 :         assert!(small.status_is_close_or_similar(&small));
     327            1 :         assert!(large.status_is_close_or_similar(&large));
     328              : 
     329              :         // values are similar if the larger value is larger by less than
     330              :         // 12.5%, i.e. 1/8 of the smaller value.
     331              :         // In the example above, large is exactly 12.5% larger, so this doesn't
     332              :         // match.
     333            1 :         assert!(!small.status_is_close_or_similar(&large));
     334            1 :         assert!(!large.status_is_close_or_similar(&small));
     335              : 
     336            1 :         large.non_reclaimable -= 1;
     337            1 :         assert!(large.status_is_close_or_similar(&large));
     338              : 
     339            1 :         assert!(small.status_is_close_or_similar(&large));
     340            1 :         assert!(large.status_is_close_or_similar(&small));
     341              : 
     342              :         // The 1/8 rule only applies up to 128MiB of difference
     343            1 :         small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
     344            1 :         large.non_reclaimable = small.non_reclaimable / 8 * 9;
     345            1 :         assert!(small.status_is_close_or_similar(&small));
     346            1 :         assert!(large.status_is_close_or_similar(&large));
     347              : 
     348            1 :         assert!(!small.status_is_close_or_similar(&large));
     349            1 :         assert!(!large.status_is_close_or_similar(&small));
     350              :         // the large value is put just above the threshold
     351            1 :         large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
     352            1 :         assert!(large.status_is_close_or_similar(&large));
     353              : 
     354            1 :         assert!(!small.status_is_close_or_similar(&large));
     355            1 :         assert!(!large.status_is_close_or_similar(&small));
     356              :         // now below
     357            1 :         large.non_reclaimable -= 1;
     358            1 :         assert!(large.status_is_close_or_similar(&large));
     359              : 
     360            1 :         assert!(small.status_is_close_or_similar(&large));
     361            1 :         assert!(large.status_is_close_or_similar(&small));
     362            1 :     }
     363              : }
        

Generated by: LCOV version 2.1-beta