LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - cgroup.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 33.0 % 112 37 75 37
Current Date: 2023-10-19 02:04:12 Functions: 10.7 % 28 3 25 3
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::fmt::{self, Debug, Formatter};
       2                 : use std::time::{Duration, Instant};
       3                 : 
       4                 : use anyhow::{anyhow, Context};
       5                 : use cgroups_rs::{
       6                 :     hierarchies::{self, is_cgroup2_unified_mode},
       7                 :     memory::MemController,
       8                 :     Subsystem,
       9                 : };
      10                 : use tokio::sync::watch;
      11                 : use tracing::{info, warn};
      12                 : 
      13                 : /// Configuration for a `CgroupWatcher`
      14 UBC           0 : #[derive(Debug, Clone)]
      15                 : pub struct Config {
      16                 :     /// Interval at which we should be fetching memory statistics
      17                 :     memory_poll_interval: Duration,
      18                 : 
      19                 :     /// The number of samples used in constructing aggregated memory statistics
      20                 :     memory_history_len: usize,
      21                 :     /// The number of most recent samples that will be periodically logged.
      22                 :     ///
      23                 :     /// Each sample is logged exactly once. Increasing this value means that recent samples will be
      24                 :     /// logged less frequently, and vice versa.
      25                 :     ///
      26                 :     /// For simplicity, this value must be greater than or equal to `memory_history_len`.
      27                 :     memory_history_log_interval: usize,
      28                 : }
      29                 : 
      30                 : impl Default for Config {
      31               0 :     fn default() -> Self {
      32               0 :         Self {
      33               0 :             memory_poll_interval: Duration::from_millis(100),
      34               0 :             memory_history_len: 5, // use 500ms of history for decision-making
      35               0 :             memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
      36               0 :         }
      37               0 :     }
      38                 : }
      39                 : 
      40                 : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
      41                 : /// OOM killed or throttling.
      42                 : ///
      43                 : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
      44                 : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
      45                 : /// cgroup happy.
      46               0 : #[derive(Debug)]
      47                 : pub struct CgroupWatcher {
      48                 :     pub config: Config,
      49                 : 
      50                 :     /// The actual cgroup we are watching and managing.
      51                 :     cgroup: cgroups_rs::Cgroup,
      52                 : }
      53                 : 
      54                 : impl CgroupWatcher {
      55                 :     /// Create a new `CgroupWatcher`.
      56               0 :     #[tracing::instrument(skip_all, fields(%name))]
      57                 :     pub fn new(name: String) -> anyhow::Result<Self> {
      58                 :         // TODO: clarify exactly why we need v2
      59                 :         // Make sure cgroups v2 (aka unified) are supported
      60                 :         if !is_cgroup2_unified_mode() {
      61                 :             anyhow::bail!("cgroups v2 not supported");
      62                 :         }
      63                 :         let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
      64                 : 
      65                 :         Ok(Self {
      66                 :             cgroup,
      67                 :             config: Default::default(),
      68                 :         })
      69                 :     }
      70                 : 
      71                 :     /// The entrypoint for the `CgroupWatcher`.
      72               0 :     #[tracing::instrument(skip_all)]
      73                 :     pub async fn watch(
      74                 :         &self,
      75                 :         updates: watch::Sender<(Instant, MemoryHistory)>,
      76                 :     ) -> anyhow::Result<()> {
      77                 :         // this requirement makes the code a bit easier to work with; see the config for more.
      78                 :         assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
      79                 : 
      80                 :         let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
      81                 :         ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
      82                 :         // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
      83                 : 
      84                 :         let mem_controller = self.memory()?;
      85                 : 
      86                 :         // buffer for samples that will be logged. once full, it remains so.
      87                 :         let history_log_len = self.config.memory_history_log_interval;
      88                 :         let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
      89                 : 
      90                 :         for t in 0_u64.. {
      91                 :             ticker.tick().await;
      92                 : 
      93                 :             let now = Instant::now();
      94                 :             let mem = Self::memory_usage(mem_controller);
      95                 : 
      96                 :             let i = t as usize % history_log_len;
      97                 :             history_log_buf[i] = mem;
      98                 : 
      99                 :             // We're taking *at most* memory_history_len values; we may be bounded by the total
     100                 :             // number of samples that have come in so far.
     101                 :             let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
     102                 :             // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
     103                 :             // that we just inserted a value there, so the end of the iterator will *include* the
     104                 :             // value at i, rather than stopping just short of it.
     105                 :             let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
     106                 : 
     107                 :             let summary = MemoryHistory {
     108               0 :                 avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
     109                 :                     / samples_count as u64,
     110                 :                 samples_count,
     111                 :                 samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
     112                 :             };
     113                 : 
     114                 :             // Log the current history if it's time to do so. Because `history_log_buf` has length
     115                 :             // equal to the logging interval, we can just log the entire buffer every time we set
     116                 :             // the last entry, which also means that for this log line, we can ignore that it's a
     117                 :             // ring buffer (because all the entries are in order of increasing time).
     118                 :             if i == history_log_len - 1 {
     119               0 :                 info!(
     120               0 :                     history = ?MemoryStatus::debug_slice(&history_log_buf),
     121               0 :                     summary = ?summary,
     122               0 :                     "Recent cgroup memory statistics history"
     123               0 :                 );
     124                 :             }
     125                 : 
     126                 :             updates
     127                 :                 .send((now, summary))
     128                 :                 .context("failed to send MemoryHistory")?;
     129                 :         }
     130                 : 
     131                 :         unreachable!()
     132                 :     }
     133                 : 
     134                 :     /// Get a handle on the memory subsystem.
     135               0 :     fn memory(&self) -> anyhow::Result<&MemController> {
     136               0 :         self.cgroup
     137               0 :             .subsystems()
     138               0 :             .iter()
     139               0 :             .find_map(|sub| match sub {
     140               0 :                 Subsystem::Mem(c) => Some(c),
     141               0 :                 _ => None,
     142               0 :             })
     143               0 :             .ok_or_else(|| anyhow!("could not find memory subsystem"))
     144               0 :     }
     145                 : 
     146                 :     /// Given a handle on the memory subsystem, returns the current memory information
     147               0 :     fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
     148               0 :         let stat = mem_controller.memory_stat().stat;
     149               0 :         MemoryStatus {
     150               0 :             non_reclaimable: stat.active_anon + stat.inactive_anon,
     151               0 :         }
     152               0 :     }
     153                 : }
     154                 : 
     155                 : // Helper function for `CgroupWatcher::watch`
     156 CBC           9 : fn ring_buf_recent_values_iter<T>(
     157               9 :     buf: &[T],
     158               9 :     last_value_idx: usize,
     159               9 :     count: usize,
     160               9 : ) -> impl '_ + Iterator<Item = &T> {
     161               9 :     // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
     162               9 :     // easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
     163               9 :     assert!(count <= buf.len());
     164                 : 
     165               9 :     buf.iter()
     166               9 :         // 'cycle' because the values could wrap around
     167               9 :         .cycle()
     168               9 :         // with 'cycle', this skip is more like 'offset', and functionally this is
     169               9 :         // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
     170               9 :         // careful to avoid underflow, so we pre-add buf.len().
     171               9 :         // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
     172               9 :         .skip((buf.len() + last_value_idx + 1 - count) % buf.len())
     173               9 :         .take(count)
     174               9 : }
     175                 : 
     176                 : /// Summary of recent memory usage
     177 UBC           0 : #[derive(Debug, Copy, Clone)]
     178                 : pub struct MemoryHistory {
     179                 :     /// Rolling average of non-reclaimable memory usage samples over the last `history_period`
     180                 :     pub avg_non_reclaimable: u64,
     181                 : 
     182                 :     /// The number of samples used to construct this summary
     183                 :     pub samples_count: usize,
     184                 :     /// Total timespan between the first and last sample used for this summary
     185                 :     pub samples_span: Duration,
     186                 : }
     187                 : 
     188               0 : #[derive(Debug, Copy, Clone)]
     189                 : pub struct MemoryStatus {
     190                 :     non_reclaimable: u64,
     191                 : }
     192                 : 
     193                 : impl MemoryStatus {
     194               0 :     fn zeroed() -> Self {
     195               0 :         MemoryStatus { non_reclaimable: 0 }
     196               0 :     }
     197                 : 
     198               0 :     fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
     199               0 :         struct DS<'a>(&'a [MemoryStatus]);
     200               0 : 
     201               0 :         impl<'a> Debug for DS<'a> {
     202               0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     203               0 :                 f.debug_struct("[MemoryStatus]")
     204               0 :                     .field(
     205               0 :                         "non_reclaimable[..]",
     206               0 :                         &Fields(self.0, |stat: &MemoryStatus| {
     207               0 :                             BytesToGB(stat.non_reclaimable)
     208               0 :                         }),
     209               0 :                     )
     210               0 :                     .finish()
     211               0 :             }
     212               0 :         }
     213               0 : 
     214               0 :         struct Fields<'a, F>(&'a [MemoryStatus], F);
     215               0 : 
     216               0 :         impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
     217               0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     218               0 :                 f.debug_list().entries(self.0.iter().map(&self.1)).finish()
     219               0 :             }
     220               0 :         }
     221               0 : 
     222               0 :         struct BytesToGB(u64);
     223               0 : 
     224               0 :         impl Debug for BytesToGB {
     225               0 :             fn fmt(&self, f: &mut Formatter) -> fmt::Result {
     226               0 :                 f.write_fmt(format_args!(
     227               0 :                     "{:.3}Gi",
     228               0 :                     self.0 as f64 / (1_u64 << 30) as f64
     229               0 :                 ))
     230               0 :             }
     231               0 :         }
     232               0 : 
     233               0 :         DS(slice)
     234               0 :     }
     235                 : }
     236                 : 
     237                 : #[cfg(test)]
     238                 : mod tests {
     239 CBC           1 :     #[test]
     240               1 :     fn ring_buf_iter() {
     241               1 :         let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
     242               1 : 
     243               9 :         let values = |offset, count| {
     244               9 :             super::ring_buf_recent_values_iter(&buf, offset, count)
     245               9 :                 .copied()
     246               9 :                 .collect::<Vec<i32>>()
     247               9 :         };
     248                 : 
     249                 :         // Boundary conditions: start, end, and entire thing:
     250               1 :         assert_eq!(values(0, 1), [0]);
     251               1 :         assert_eq!(values(3, 4), [0, 1, 2, 3]);
     252               1 :         assert_eq!(values(9, 4), [6, 7, 8, 9]);
     253               1 :         assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
     254                 : 
     255                 :         // "normal" operation: no wraparound
     256               1 :         assert_eq!(values(7, 4), [4, 5, 6, 7]);
     257                 : 
     258                 :         // wraparound:
     259               1 :         assert_eq!(values(0, 4), [7, 8, 9, 0]);
     260               1 :         assert_eq!(values(1, 4), [8, 9, 0, 1]);
     261               1 :         assert_eq!(values(2, 4), [9, 0, 1, 2]);
     262               1 :         assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
     263               1 :     }
     264                 : }
        

Generated by: LCOV version 2.1-beta