LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - cgroup.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 35.3 % 102 36
Test Date: 2024-05-10 13:18:37 Functions: 15.0 % 20 3

            Line data    Source code
       1              : use std::fmt::{self, Debug, Formatter};
       2              : use std::time::{Duration, Instant};
       3              : 
       4              : use anyhow::{anyhow, Context};
       5              : use cgroups_rs::{
       6              :     hierarchies::{self, is_cgroup2_unified_mode},
       7              :     memory::MemController,
       8              :     Subsystem,
       9              : };
      10              : use tokio::sync::watch;
      11              : use tracing::{info, warn};
      12              : 
      13              : /// Configuration for a `CgroupWatcher`
      14              : #[derive(Debug, Clone)]
      15              : pub struct Config {
      16              :     /// Interval at which we should be fetching memory statistics
      17              :     memory_poll_interval: Duration,
      18              : 
      19              :     /// The number of samples used in constructing aggregated memory statistics
      20              :     memory_history_len: usize,
      21              :     /// The number of most recent samples that will be periodically logged.
      22              :     ///
      23              :     /// Each sample is logged exactly once. Increasing this value means that recent samples will be
      24              :     /// logged less frequently, and vice versa.
      25              :     ///
      26              :     /// For simplicity, this value must be greater than or equal to `memory_history_len`.
      27              :     memory_history_log_interval: usize,
      28              : }
      29              : 
      30              : impl Default for Config {
      31            0 :     fn default() -> Self {
      32            0 :         Self {
      33            0 :             memory_poll_interval: Duration::from_millis(100),
      34            0 :             memory_history_len: 5, // use 500ms of history for decision-making
      35            0 :             memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
      36            0 :         }
      37            0 :     }
      38              : }
      39              : 
      40              : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
      41              : /// OOM killed or throttling.
      42              : ///
      43              : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
      44              : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
      45              : /// cgroup happy.
      46              : #[derive(Debug)]
      47              : pub struct CgroupWatcher {
      48              :     pub config: Config,
      49              : 
      50              :     /// The actual cgroup we are watching and managing.
      51              :     cgroup: cgroups_rs::Cgroup,
      52              : }
      53              : 
      54              : impl CgroupWatcher {
      55              :     /// Create a new `CgroupWatcher`.
      56            0 :     #[tracing::instrument(skip_all, fields(%name))]
      57              :     pub fn new(name: String) -> anyhow::Result<Self> {
      58              :         // TODO: clarify exactly why we need v2
      59              :         // Make sure cgroups v2 (aka unified) are supported
      60              :         if !is_cgroup2_unified_mode() {
      61              :             anyhow::bail!("cgroups v2 not supported");
      62              :         }
      63              :         let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
      64              : 
      65              :         Ok(Self {
      66              :             cgroup,
      67              :             config: Default::default(),
      68              :         })
      69              :     }
      70              : 
      71              :     /// The entrypoint for the `CgroupWatcher`.
      72            0 :     #[tracing::instrument(skip_all)]
      73              :     pub async fn watch(
      74              :         &self,
      75              :         updates: watch::Sender<(Instant, MemoryHistory)>,
      76              :     ) -> anyhow::Result<()> {
      77              :         // this requirement makes the code a bit easier to work with; see the config for more.
      78              :         assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
      79              : 
      80              :         let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
      81              :         ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
      82              :         // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
      83              : 
      84              :         let mem_controller = self.memory()?;
      85              : 
      86              :         // buffer for samples that will be logged. once full, it remains so.
      87              :         let history_log_len = self.config.memory_history_log_interval;
      88              :         let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
      89              : 
      90              :         for t in 0_u64.. {
      91              :             ticker.tick().await;
      92              : 
      93              :             let now = Instant::now();
      94              :             let mem = Self::memory_usage(mem_controller);
      95              : 
      96              :             let i = t as usize % history_log_len;
      97              :             history_log_buf[i] = mem;
      98              : 
      99              :             // We're taking *at most* memory_history_len values; we may be bounded by the total
     100              :             // number of samples that have come in so far.
     101              :             let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
     102              :             // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
     103              :             // that we just inserted a value there, so the end of the iterator will *include* the
     104              :             // value at i, rather than stopping just short of it.
     105              :             let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
     106              : 
     107              :             let summary = MemoryHistory {
     108            0 :                 avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
     109              :                     / samples_count as u64,
     110              :                 samples_count,
     111              :                 samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
     112              :             };
     113              : 
     114              :             // Log the current history if it's time to do so. Because `history_log_buf` has length
     115              :             // equal to the logging interval, we can just log the entire buffer every time we set
     116              :             // the last entry, which also means that for this log line, we can ignore that it's a
     117              :             // ring buffer (because all the entries are in order of increasing time).
     118              :             if i == history_log_len - 1 {
     119              :                 info!(
     120              :                     history = ?MemoryStatus::debug_slice(&history_log_buf),
     121              :                     summary = ?summary,
     122              :                     "Recent cgroup memory statistics history"
     123              :                 );
     124              :             }
     125              : 
     126              :             updates
     127              :                 .send((now, summary))
     128              :                 .context("failed to send MemoryHistory")?;
     129              :         }
     130              : 
     131              :         unreachable!()
     132              :     }
     133              : 
     134              :     /// Get a handle on the memory subsystem.
     135            0 :     fn memory(&self) -> anyhow::Result<&MemController> {
     136            0 :         self.cgroup
     137            0 :             .subsystems()
     138            0 :             .iter()
     139            0 :             .find_map(|sub| match sub {
     140            0 :                 Subsystem::Mem(c) => Some(c),
     141            0 :                 _ => None,
     142            0 :             })
     143            0 :             .ok_or_else(|| anyhow!("could not find memory subsystem"))
     144            0 :     }
     145              : 
     146              :     /// Given a handle on the memory subsystem, returns the current memory information
     147            0 :     fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
     148            0 :         let stat = mem_controller.memory_stat().stat;
     149            0 :         MemoryStatus {
     150            0 :             non_reclaimable: stat.active_anon + stat.inactive_anon,
     151            0 :         }
     152            0 :     }
     153              : }
     154              : 
     155              : // Helper function for `CgroupWatcher::watch`
     156           18 : fn ring_buf_recent_values_iter<T>(
     157           18 :     buf: &[T],
     158           18 :     last_value_idx: usize,
     159           18 :     count: usize,
     160           18 : ) -> impl '_ + Iterator<Item = &T> {
     161           18 :     // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
     162           18 :     // easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
     163           18 :     assert!(count <= buf.len());
     164              : 
     165           18 :     buf.iter()
     166           18 :         // 'cycle' because the values could wrap around
     167           18 :         .cycle()
     168           18 :         // with 'cycle', this skip is more like 'offset', and functionally this is
     169           18 :         // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
     170           18 :         // careful to avoid underflow, so we pre-add buf.len().
     171           18 :         // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
     172           18 :         .skip((buf.len() + last_value_idx + 1 - count) % buf.len())
     173           18 :         .take(count)
     174           18 : }
     175              : 
     176              : /// Summary of recent memory usage
     177              : #[derive(Debug, Copy, Clone)]
     178              : pub struct MemoryHistory {
     179              :     /// Rolling average of non-reclaimable memory usage samples over the last `history_period`
     180              :     pub avg_non_reclaimable: u64,
     181              : 
     182              :     /// The number of samples used to construct this summary
     183              :     pub samples_count: usize,
     184              :     /// Total timespan between the first and last sample used for this summary
     185              :     pub samples_span: Duration,
     186              : }
     187              : 
     188              : #[derive(Debug, Copy, Clone)]
     189              : pub struct MemoryStatus {
     190              :     non_reclaimable: u64,
     191              : }
     192              : 
     193              : impl MemoryStatus {
     194            0 :     fn zeroed() -> Self {
     195            0 :         MemoryStatus { non_reclaimable: 0 }
     196            0 :     }
     197              : 
     198            0 :     fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
     199            0 :         struct DS<'a>(&'a [MemoryStatus]);
     200            0 : 
     201            0 :         impl<'a> Debug for DS<'a> {
     202            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     203            0 :                 f.debug_struct("[MemoryStatus]")
     204            0 :                     .field(
     205            0 :                         "non_reclaimable[..]",
     206            0 :                         &Fields(self.0, |stat: &MemoryStatus| {
     207            0 :                             BytesToGB(stat.non_reclaimable)
     208            0 :                         }),
     209            0 :                     )
     210            0 :                     .finish()
     211            0 :             }
     212            0 :         }
     213            0 : 
     214            0 :         struct Fields<'a, F>(&'a [MemoryStatus], F);
     215            0 : 
     216            0 :         impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
     217            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     218            0 :                 f.debug_list().entries(self.0.iter().map(&self.1)).finish()
     219            0 :             }
     220            0 :         }
     221            0 : 
     222            0 :         struct BytesToGB(u64);
     223            0 : 
     224            0 :         impl Debug for BytesToGB {
     225            0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     226            0 :                 f.write_fmt(format_args!(
     227            0 :                     "{:.3}Gi",
     228            0 :                     self.0 as f64 / (1_u64 << 30) as f64
     229            0 :                 ))
     230            0 :             }
     231            0 :         }
     232            0 : 
     233            0 :         DS(slice)
     234            0 :     }
     235              : }
     236              : 
     237              : #[cfg(test)]
     238              : mod tests {
     239              :     #[test]
     240            2 :     fn ring_buf_iter() {
     241            2 :         let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
     242            2 : 
     243           18 :         let values = |offset, count| {
     244           18 :             super::ring_buf_recent_values_iter(&buf, offset, count)
     245           18 :                 .copied()
     246           18 :                 .collect::<Vec<i32>>()
     247           18 :         };
     248              : 
     249              :         // Boundary conditions: start, end, and entire thing:
     250            2 :         assert_eq!(values(0, 1), [0]);
     251            2 :         assert_eq!(values(3, 4), [0, 1, 2, 3]);
     252            2 :         assert_eq!(values(9, 4), [6, 7, 8, 9]);
     253            2 :         assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
     254              : 
     255              :         // "normal" operation: no wraparound
     256            2 :         assert_eq!(values(7, 4), [4, 5, 6, 7]);
     257              : 
     258              :         // wraparound:
     259            2 :         assert_eq!(values(0, 4), [7, 8, 9, 0]);
     260            2 :         assert_eq!(values(1, 4), [8, 9, 0, 1]);
     261            2 :         assert_eq!(values(2, 4), [9, 0, 1, 2]);
     262            2 :         assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
     263            2 :     }
     264              : }
        

Generated by: LCOV version 2.1-beta