Line data Source code
1 : use std::fmt::{self, Debug, Formatter};
2 : use std::time::{Duration, Instant};
3 :
4 : use anyhow::{anyhow, Context};
5 : use cgroups_rs::{
6 : hierarchies::{self, is_cgroup2_unified_mode},
7 : memory::MemController,
8 : Subsystem,
9 : };
10 : use tokio::sync::watch;
11 : use tracing::{info, warn};
12 :
13 : /// Configuration for a `CgroupWatcher`
14 0 : #[derive(Debug, Clone)]
15 : pub struct Config {
16 : /// Interval at which we should be fetching memory statistics
17 : memory_poll_interval: Duration,
18 :
19 : /// The number of samples used in constructing aggregated memory statistics
20 : memory_history_len: usize,
21 : /// The number of most recent samples that will be periodically logged.
22 : ///
23 : /// Each sample is logged exactly once. Increasing this value means that recent samples will be
24 : /// logged less frequently, and vice versa.
25 : ///
26 : /// For simplicity, this value must be greater than or equal to `memory_history_len`.
27 : memory_history_log_interval: usize,
28 : }
29 :
30 : impl Default for Config {
31 0 : fn default() -> Self {
32 0 : Self {
33 0 : memory_poll_interval: Duration::from_millis(100),
34 0 : memory_history_len: 5, // use 500ms of history for decision-making
35 0 : memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
36 0 : }
37 0 : }
38 : }
39 :
40 : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
41 : /// OOM killed or throttling.
42 : ///
43 : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
44 : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
45 : /// cgroup happy.
46 0 : #[derive(Debug)]
47 : pub struct CgroupWatcher {
48 : pub config: Config,
49 :
50 : /// The actual cgroup we are watching and managing.
51 : cgroup: cgroups_rs::Cgroup,
52 : }
53 :
54 : impl CgroupWatcher {
55 : /// Create a new `CgroupWatcher`.
56 0 : #[tracing::instrument(skip_all, fields(%name))]
57 : pub fn new(name: String) -> anyhow::Result<Self> {
58 : // TODO: clarify exactly why we need v2
59 : // Make sure cgroups v2 (aka unified) are supported
60 : if !is_cgroup2_unified_mode() {
61 : anyhow::bail!("cgroups v2 not supported");
62 : }
63 : let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
64 :
65 : Ok(Self {
66 : cgroup,
67 : config: Default::default(),
68 : })
69 : }
70 :
71 : /// The entrypoint for the `CgroupWatcher`.
72 0 : #[tracing::instrument(skip_all)]
73 : pub async fn watch(
74 : &self,
75 : updates: watch::Sender<(Instant, MemoryHistory)>,
76 : ) -> anyhow::Result<()> {
77 : // this requirement makes the code a bit easier to work with; see the config for more.
78 : assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
79 :
80 : let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
81 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
82 : // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
83 :
84 : let mem_controller = self.memory()?;
85 :
86 : // buffer for samples that will be logged. once full, it remains so.
87 : let history_log_len = self.config.memory_history_log_interval;
88 : let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
89 :
90 : for t in 0_u64.. {
91 : ticker.tick().await;
92 :
93 : let now = Instant::now();
94 : let mem = Self::memory_usage(mem_controller);
95 :
96 : let i = t as usize % history_log_len;
97 : history_log_buf[i] = mem;
98 :
99 : // We're taking *at most* memory_history_len values; we may be bounded by the total
100 : // number of samples that have come in so far.
101 : let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
102 : // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
103 : // that we just inserted a value there, so the end of the iterator will *include* the
104 : // value at i, rather than stopping just short of it.
105 : let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
106 :
107 : let summary = MemoryHistory {
108 0 : avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
109 : / samples_count as u64,
110 : samples_count,
111 : samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
112 : };
113 :
114 : // Log the current history if it's time to do so. Because `history_log_buf` has length
115 : // equal to the logging interval, we can just log the entire buffer every time we set
116 : // the last entry, which also means that for this log line, we can ignore that it's a
117 : // ring buffer (because all the entries are in order of increasing time).
118 : if i == history_log_len - 1 {
119 0 : info!(
120 0 : history = ?MemoryStatus::debug_slice(&history_log_buf),
121 0 : summary = ?summary,
122 0 : "Recent cgroup memory statistics history"
123 0 : );
124 : }
125 :
126 : updates
127 : .send((now, summary))
128 : .context("failed to send MemoryHistory")?;
129 : }
130 :
131 : unreachable!()
132 : }
133 :
134 : /// Get a handle on the memory subsystem.
135 0 : fn memory(&self) -> anyhow::Result<&MemController> {
136 0 : self.cgroup
137 0 : .subsystems()
138 0 : .iter()
139 0 : .find_map(|sub| match sub {
140 0 : Subsystem::Mem(c) => Some(c),
141 0 : _ => None,
142 0 : })
143 0 : .ok_or_else(|| anyhow!("could not find memory subsystem"))
144 0 : }
145 :
146 : /// Given a handle on the memory subsystem, returns the current memory information
147 0 : fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
148 0 : let stat = mem_controller.memory_stat().stat;
149 0 : MemoryStatus {
150 0 : non_reclaimable: stat.active_anon + stat.inactive_anon,
151 0 : }
152 0 : }
153 : }
154 :
155 : // Helper function for `CgroupWatcher::watch`
156 18 : fn ring_buf_recent_values_iter<T>(
157 18 : buf: &[T],
158 18 : last_value_idx: usize,
159 18 : count: usize,
160 18 : ) -> impl '_ + Iterator<Item = &T> {
161 : // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
162 : // easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
163 18 : assert!(count <= buf.len());
164 :
165 18 : buf.iter()
166 18 : // 'cycle' because the values could wrap around
167 18 : .cycle()
168 18 : // with 'cycle', this skip is more like 'offset', and functionally this is
169 18 : // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
170 18 : // careful to avoid underflow, so we pre-add buf.len().
171 18 : // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
172 18 : .skip((buf.len() + last_value_idx + 1 - count) % buf.len())
173 18 : .take(count)
174 18 : }
175 :
176 : /// Summary of recent memory usage
177 0 : #[derive(Debug, Copy, Clone)]
178 : pub struct MemoryHistory {
179 : /// Rolling average of non-reclaimable memory usage samples over the last `history_period`
180 : pub avg_non_reclaimable: u64,
181 :
182 : /// The number of samples used to construct this summary
183 : pub samples_count: usize,
184 : /// Total timespan between the first and last sample used for this summary
185 : pub samples_span: Duration,
186 : }
187 :
188 0 : #[derive(Debug, Copy, Clone)]
189 : pub struct MemoryStatus {
190 : non_reclaimable: u64,
191 : }
192 :
193 : impl MemoryStatus {
194 0 : fn zeroed() -> Self {
195 0 : MemoryStatus { non_reclaimable: 0 }
196 0 : }
197 :
198 0 : fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
199 0 : struct DS<'a>(&'a [MemoryStatus]);
200 0 :
201 0 : impl<'a> Debug for DS<'a> {
202 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
203 0 : f.debug_struct("[MemoryStatus]")
204 0 : .field(
205 0 : "non_reclaimable[..]",
206 0 : &Fields(self.0, |stat: &MemoryStatus| {
207 0 : BytesToGB(stat.non_reclaimable)
208 0 : }),
209 0 : )
210 0 : .finish()
211 0 : }
212 0 : }
213 0 :
214 0 : struct Fields<'a, F>(&'a [MemoryStatus], F);
215 0 :
216 0 : impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
217 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
218 0 : f.debug_list().entries(self.0.iter().map(&self.1)).finish()
219 0 : }
220 0 : }
221 0 :
222 0 : struct BytesToGB(u64);
223 0 :
224 0 : impl Debug for BytesToGB {
225 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
226 0 : f.write_fmt(format_args!(
227 0 : "{:.3}Gi",
228 0 : self.0 as f64 / (1_u64 << 30) as f64
229 0 : ))
230 0 : }
231 0 : }
232 0 :
233 0 : DS(slice)
234 0 : }
235 : }
236 :
237 : #[cfg(test)]
238 : mod tests {
239 2 : #[test]
240 2 : fn ring_buf_iter() {
241 2 : let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
242 2 :
243 18 : let values = |offset, count| {
244 18 : super::ring_buf_recent_values_iter(&buf, offset, count)
245 18 : .copied()
246 18 : .collect::<Vec<i32>>()
247 18 : };
248 :
249 : // Boundary conditions: start, end, and entire thing:
250 2 : assert_eq!(values(0, 1), [0]);
251 2 : assert_eq!(values(3, 4), [0, 1, 2, 3]);
252 2 : assert_eq!(values(9, 4), [6, 7, 8, 9]);
253 2 : assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
254 :
255 : // "normal" operation: no wraparound
256 2 : assert_eq!(values(7, 4), [4, 5, 6, 7]);
257 :
258 : // wraparound:
259 2 : assert_eq!(values(0, 4), [7, 8, 9, 0]);
260 2 : assert_eq!(values(1, 4), [8, 9, 0, 1]);
261 2 : assert_eq!(values(2, 4), [9, 0, 1, 2]);
262 2 : assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
263 2 : }
264 : }
|