Line data Source code
1 : use std::fmt::{self, Debug, Formatter};
2 : use std::time::{Duration, Instant};
3 :
4 : use anyhow::{Context, anyhow};
5 : use cgroups_rs::Subsystem;
6 : use cgroups_rs::hierarchies::{self, is_cgroup2_unified_mode};
7 : use cgroups_rs::memory::MemController;
8 : use tokio::sync::watch;
9 : use tracing::{info, warn};
10 :
11 : /// Configuration for a `CgroupWatcher`
12 : #[derive(Debug, Clone)]
13 : pub struct Config {
14 : /// Interval at which we should be fetching memory statistics
15 : memory_poll_interval: Duration,
16 :
17 : /// The number of samples used in constructing aggregated memory statistics
18 : memory_history_len: usize,
19 : /// The number of most recent samples that will be periodically logged.
20 : ///
21 : /// Each sample is logged exactly once. Increasing this value means that recent samples will be
22 : /// logged less frequently, and vice versa.
23 : ///
24 : /// For simplicity, this value must be greater than or equal to `memory_history_len`.
25 : memory_history_log_interval: usize,
26 : /// The max number of iterations to skip before logging the next iteration
27 : memory_history_log_noskip_interval: Duration,
28 : }
29 :
30 : impl Default for Config {
31 0 : fn default() -> Self {
32 0 : Self {
33 0 : memory_poll_interval: Duration::from_millis(100),
34 0 : memory_history_len: 5, // use 500ms of history for decision-making
35 0 : memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
36 0 : memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
37 0 : }
38 0 : }
39 : }
40 :
41 : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
42 : /// OOM killed or throttling.
43 : ///
44 : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
45 : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
46 : /// cgroup happy.
47 : #[derive(Debug)]
48 : pub struct CgroupWatcher {
49 : pub config: Config,
50 :
51 : /// The actual cgroup we are watching and managing.
52 : cgroup: cgroups_rs::Cgroup,
53 : }
54 :
55 : impl CgroupWatcher {
56 : /// Create a new `CgroupWatcher`.
57 : #[tracing::instrument(skip_all, fields(%name))]
58 : pub fn new(name: String) -> anyhow::Result<Self> {
59 : // TODO: clarify exactly why we need v2
60 : // Make sure cgroups v2 (aka unified) are supported
61 : if !is_cgroup2_unified_mode() {
62 : anyhow::bail!("cgroups v2 not supported");
63 : }
64 : let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
65 :
66 : Ok(Self {
67 : cgroup,
68 : config: Default::default(),
69 : })
70 : }
71 :
72 : /// The entrypoint for the `CgroupWatcher`.
73 : #[tracing::instrument(skip_all)]
74 : pub async fn watch(
75 : &self,
76 : updates: watch::Sender<(Instant, MemoryHistory)>,
77 : ) -> anyhow::Result<()> {
78 : // this requirement makes the code a bit easier to work with; see the config for more.
79 : assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
80 :
81 : let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
82 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
83 : // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
84 :
85 : let mem_controller = self.memory()?;
86 :
87 : // buffer for samples that will be logged. once full, it remains so.
88 : let history_log_len = self.config.memory_history_log_interval;
89 : let max_skip = self.config.memory_history_log_noskip_interval;
90 : let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
91 : let mut last_logged_memusage = MemoryStatus::zeroed();
92 :
93 : // Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
94 : let mut can_skip_logs_until = Instant::now() - max_skip;
95 :
96 : for t in 0_u64.. {
97 : ticker.tick().await;
98 :
99 : let now = Instant::now();
100 : let mem = Self::memory_usage(mem_controller);
101 :
102 : let i = t as usize % history_log_len;
103 : history_log_buf[i] = mem;
104 :
105 : // We're taking *at most* memory_history_len values; we may be bounded by the total
106 : // number of samples that have come in so far.
107 : let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
108 : // NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
109 : // that we just inserted a value there, so the end of the iterator will *include* the
110 : // value at i, rather than stopping just short of it.
111 : let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
112 :
113 : let summary = MemoryHistory {
114 0 : avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
115 : / samples_count as u64,
116 : samples_count,
117 : samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
118 : };
119 :
120 : // Log the current history if it's time to do so. Because `history_log_buf` has length
121 : // equal to the logging interval, we can just log the entire buffer every time we set
122 : // the last entry, which also means that for this log line, we can ignore that it's a
123 : // ring buffer (because all the entries are in order of increasing time).
124 : //
125 : // We skip logging the data if data hasn't meaningfully changed in a while, unless
126 : // we've already ignored previous iterations for the last max_skip period.
127 : if i == history_log_len - 1
128 : && (now > can_skip_logs_until
129 : || !history_log_buf
130 : .iter()
131 0 : .all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
132 : {
133 : info!(
134 : history = ?MemoryStatus::debug_slice(&history_log_buf),
135 : summary = ?summary,
136 : "Recent cgroup memory statistics history"
137 : );
138 :
139 : can_skip_logs_until = now + max_skip;
140 :
141 : last_logged_memusage = *history_log_buf.last().unwrap();
142 : }
143 :
144 : updates
145 : .send((now, summary))
146 : .context("failed to send MemoryHistory")?;
147 : }
148 :
149 : unreachable!()
150 : }
151 :
152 : /// Get a handle on the memory subsystem.
153 0 : fn memory(&self) -> anyhow::Result<&MemController> {
154 0 : self.cgroup
155 0 : .subsystems()
156 0 : .iter()
157 0 : .find_map(|sub| match sub {
158 0 : Subsystem::Mem(c) => Some(c),
159 0 : _ => None,
160 0 : })
161 0 : .ok_or_else(|| anyhow!("could not find memory subsystem"))
162 0 : }
163 :
164 : /// Given a handle on the memory subsystem, returns the current memory information
165 0 : fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
166 0 : let stat = mem_controller.memory_stat().stat;
167 0 : MemoryStatus {
168 0 : non_reclaimable: stat.active_anon + stat.inactive_anon,
169 0 : }
170 0 : }
171 : }
172 :
173 : // Helper function for `CgroupWatcher::watch`
174 9 : fn ring_buf_recent_values_iter<T>(
175 9 : buf: &[T],
176 9 : last_value_idx: usize,
177 9 : count: usize,
178 9 : ) -> impl '_ + Iterator<Item = &T> {
179 9 : // Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
180 9 : // easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
181 9 : assert!(count <= buf.len());
182 :
183 9 : buf.iter()
184 9 : // 'cycle' because the values could wrap around
185 9 : .cycle()
186 9 : // with 'cycle', this skip is more like 'offset', and functionally this is
187 9 : // offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
188 9 : // careful to avoid underflow, so we pre-add buf.len().
189 9 : // The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
190 9 : .skip((buf.len() + last_value_idx + 1 - count) % buf.len())
191 9 : .take(count)
192 9 : }
193 :
194 : /// Summary of recent memory usage
195 : #[derive(Debug, Copy, Clone)]
196 : pub struct MemoryHistory {
197 : /// Rolling average of non-reclaimable memory usage samples over the last `history_period`
198 : pub avg_non_reclaimable: u64,
199 :
200 : /// The number of samples used to construct this summary
201 : pub samples_count: usize,
202 : /// Total timespan between the first and last sample used for this summary
203 : pub samples_span: Duration,
204 : }
205 :
206 : #[derive(Debug, Copy, Clone)]
207 : pub struct MemoryStatus {
208 : non_reclaimable: u64,
209 : }
210 :
211 : impl MemoryStatus {
212 0 : fn zeroed() -> Self {
213 0 : MemoryStatus { non_reclaimable: 0 }
214 0 : }
215 :
216 0 : fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
217 : struct DS<'a>(&'a [MemoryStatus]);
218 :
219 : impl Debug for DS<'_> {
220 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
221 0 : f.debug_struct("[MemoryStatus]")
222 0 : .field(
223 0 : "non_reclaimable[..]",
224 0 : &Fields(self.0, |stat: &MemoryStatus| {
225 0 : BytesToGB(stat.non_reclaimable)
226 0 : }),
227 0 : )
228 0 : .finish()
229 0 : }
230 : }
231 :
232 : struct Fields<'a, F>(&'a [MemoryStatus], F);
233 :
234 : impl<F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'_, F> {
235 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
236 0 : f.debug_list().entries(self.0.iter().map(&self.1)).finish()
237 0 : }
238 : }
239 :
240 : struct BytesToGB(u64);
241 :
242 : impl Debug for BytesToGB {
243 0 : fn fmt(&self, f: &mut Formatter) -> fmt::Result {
244 0 : f.write_fmt(format_args!(
245 0 : "{:.3}Gi",
246 0 : self.0 as f64 / (1_u64 << 30) as f64
247 0 : ))
248 0 : }
249 : }
250 :
251 0 : DS(slice)
252 0 : }
253 :
254 : /// Check if the other memory status is a close or similar result.
255 : /// Returns true if the larger value is not larger than the smaller value
256 : /// by 1/8 of the smaller value, and within 128MiB.
257 : /// See tests::check_similarity_behaviour for examples of behaviour
258 21 : fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
259 21 : let margin;
260 21 : let diff;
261 21 : if self.non_reclaimable >= other.non_reclaimable {
262 15 : margin = other.non_reclaimable / 8;
263 15 : diff = self.non_reclaimable - other.non_reclaimable;
264 15 : } else {
265 6 : margin = self.non_reclaimable / 8;
266 6 : diff = other.non_reclaimable - self.non_reclaimable;
267 6 : }
268 :
269 21 : diff < margin && diff < 128 * 1024 * 1024
270 21 : }
271 : }
272 :
273 : #[cfg(test)]
274 : mod tests {
275 : #[test]
276 1 : fn ring_buf_iter() {
277 1 : let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
278 1 :
279 9 : let values = |offset, count| {
280 9 : super::ring_buf_recent_values_iter(&buf, offset, count)
281 9 : .copied()
282 9 : .collect::<Vec<i32>>()
283 9 : };
284 :
285 : // Boundary conditions: start, end, and entire thing:
286 1 : assert_eq!(values(0, 1), [0]);
287 1 : assert_eq!(values(3, 4), [0, 1, 2, 3]);
288 1 : assert_eq!(values(9, 4), [6, 7, 8, 9]);
289 1 : assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
290 :
291 : // "normal" operation: no wraparound
292 1 : assert_eq!(values(7, 4), [4, 5, 6, 7]);
293 :
294 : // wraparound:
295 1 : assert_eq!(values(0, 4), [7, 8, 9, 0]);
296 1 : assert_eq!(values(1, 4), [8, 9, 0, 1]);
297 1 : assert_eq!(values(2, 4), [9, 0, 1, 2]);
298 1 : assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
299 1 : }
300 :
301 : #[test]
302 1 : fn check_similarity_behaviour() {
303 1 : // This all accesses private methods, so we can't actually run this
304 1 : // as doctests, because doctests run as an external crate.
305 1 : let mut small = super::MemoryStatus {
306 1 : non_reclaimable: 1024,
307 1 : };
308 1 : let mut large = super::MemoryStatus {
309 1 : non_reclaimable: 1024 * 1024 * 1024 * 1024,
310 1 : };
311 1 :
312 1 : // objects are self-similar, no matter the size
313 1 : assert!(small.status_is_close_or_similar(&small));
314 1 : assert!(large.status_is_close_or_similar(&large));
315 :
316 : // inequality is symmetric
317 1 : assert!(!small.status_is_close_or_similar(&large));
318 1 : assert!(!large.status_is_close_or_similar(&small));
319 :
320 1 : small.non_reclaimable = 64;
321 1 : large.non_reclaimable = (small.non_reclaimable / 8) * 9;
322 1 :
323 1 : // objects are self-similar, no matter the size
324 1 : assert!(small.status_is_close_or_similar(&small));
325 1 : assert!(large.status_is_close_or_similar(&large));
326 :
327 : // values are similar if the larger value is larger by less than
328 : // 12.5%, i.e. 1/8 of the smaller value.
329 : // In the example above, large is exactly 12.5% larger, so this doesn't
330 : // match.
331 1 : assert!(!small.status_is_close_or_similar(&large));
332 1 : assert!(!large.status_is_close_or_similar(&small));
333 :
334 1 : large.non_reclaimable -= 1;
335 1 : assert!(large.status_is_close_or_similar(&large));
336 :
337 1 : assert!(small.status_is_close_or_similar(&large));
338 1 : assert!(large.status_is_close_or_similar(&small));
339 :
340 : // The 1/8 rule only applies up to 128MiB of difference
341 1 : small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
342 1 : large.non_reclaimable = small.non_reclaimable / 8 * 9;
343 1 : assert!(small.status_is_close_or_similar(&small));
344 1 : assert!(large.status_is_close_or_similar(&large));
345 :
346 1 : assert!(!small.status_is_close_or_similar(&large));
347 1 : assert!(!large.status_is_close_or_similar(&small));
348 : // the large value is put just above the threshold
349 1 : large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
350 1 : assert!(large.status_is_close_or_similar(&large));
351 :
352 1 : assert!(!small.status_is_close_or_similar(&large));
353 1 : assert!(!large.status_is_close_or_similar(&small));
354 : // now below
355 1 : large.non_reclaimable -= 1;
356 1 : assert!(large.status_is_close_or_similar(&large));
357 :
358 1 : assert!(small.status_is_close_or_similar(&large));
359 1 : assert!(large.status_is_close_or_similar(&small));
360 1 : }
361 : }
|