Line data Source code
1 : use std::{
2 : fmt::{Debug, Display},
3 : fs,
4 : pin::pin,
5 : sync::atomic::{AtomicU64, Ordering},
6 : };
7 :
8 : use anyhow::{anyhow, bail, Context};
9 : use cgroups_rs::{
10 : freezer::FreezerController,
11 : hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT},
12 : memory::MemController,
13 : MaxValue,
14 : Subsystem::{Freezer, Mem},
15 : };
16 : use inotify::{EventStream, Inotify, WatchMask};
17 : use tokio::sync::mpsc::{self, error::TryRecvError};
18 : use tokio::time::{Duration, Instant};
19 : use tokio_stream::{Stream, StreamExt};
20 : use tracing::{info, warn};
21 :
22 : use crate::protocol::Resources;
23 : use crate::MiB;
24 :
25 : /// Monotonically increasing counter of the number of memory.high events
26 : /// the cgroup has experienced.
27 : ///
28 : /// We use this to determine if a modification to the `memory.events` file actually
29 : /// changed the `high` field. If not, we don't care about the change. When we
30 : /// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT`
31 : /// to see if it changed since last time.
32 : pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
33 :
34 : /// Monotonically increasing counter that gives each cgroup event a unique id.
35 : ///
36 : /// This allows us to answer questions like "did this upscale arrive before this
37 : /// memory.high?". This static is also used by the `Sequenced` type to "tag" values
38 : /// with a sequence number. As such, prefer to used the `Sequenced` type rather
39 : /// than this static directly.
40 : static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0);
41 :
42 : /// A memory event type reported in memory.events.
43 0 : #[derive(Debug, Eq, PartialEq, Copy, Clone)]
44 : pub enum MemoryEvent {
45 : Low,
46 : High,
47 : Max,
48 : Oom,
49 : OomKill,
50 : OomGroupKill,
51 : }
52 :
53 : impl MemoryEvent {
54 0 : fn as_str(&self) -> &str {
55 0 : match self {
56 0 : MemoryEvent::Low => "low",
57 0 : MemoryEvent::High => "high",
58 0 : MemoryEvent::Max => "max",
59 0 : MemoryEvent::Oom => "oom",
60 0 : MemoryEvent::OomKill => "oom_kill",
61 0 : MemoryEvent::OomGroupKill => "oom_group_kill",
62 : }
63 0 : }
64 : }
65 :
66 : impl Display for MemoryEvent {
67 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 0 : f.write_str(self.as_str())
69 0 : }
70 : }
71 :
72 : /// Configuration for a `CgroupWatcher`
73 0 : #[derive(Debug, Clone)]
74 : pub struct Config {
75 : // The target difference between the total memory reserved for the cgroup
76 : // and the value of the cgroup's memory.high.
77 : //
78 : // In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may
79 : // use (equal to system memory, minus whatever's taken out for the file cache).
80 : oom_buffer_bytes: u64,
81 :
82 : // The amount of memory, in bytes, below a proposed new value for
83 : // memory.high that the cgroup's memory usage must be for us to downscale
84 : //
85 : // In other words, we can downscale only when:
86 : //
87 : // memory.current + memory_high_buffer_bytes < (proposed) memory.high
88 : //
89 : // TODO: there's some minor issues with this approach -- in particular, that we might have
90 : // memory in use by the kernel's page cache that we're actually ok with getting rid of.
91 : pub(crate) memory_high_buffer_bytes: u64,
92 :
93 : // The maximum duration, in milliseconds, that we're allowed to pause
94 : // the cgroup for while waiting for the autoscaler-agent to upscale us
95 : max_upscale_wait: Duration,
96 :
97 : // The required minimum time, in milliseconds, that we must wait before re-freezing
98 : // the cgroup while waiting for the autoscaler-agent to upscale us.
99 : do_not_freeze_more_often_than: Duration,
100 :
101 : // The amount of memory, in bytes, that we should periodically increase memory.high
102 : // by while waiting for the autoscaler-agent to upscale us.
103 : //
104 : // This exists to avoid the excessive throttling that happens when a cgroup is above its
105 : // memory.high for too long. See more here:
106 : // https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217
107 : memory_high_increase_by_bytes: u64,
108 :
109 : // The period, in milliseconds, at which we should repeatedly increase the value
110 : // of the cgroup's memory.high while we're waiting on upscaling and memory.high
111 : // is still being hit.
112 : //
113 : // Technically speaking, this actually serves as a rate limit to moderate responding to
114 : // memory.high events, but these are roughly equivalent if the process is still allocating
115 : // memory.
116 : memory_high_increase_every: Duration,
117 : }
118 :
119 : impl Config {
120 : /// Calculate the new value for the cgroups memory.high based on system memory
121 0 : pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 {
122 0 : total_system_mem.saturating_sub(self.oom_buffer_bytes)
123 0 : }
124 : }
125 :
126 : impl Default for Config {
127 0 : fn default() -> Self {
128 0 : Self {
129 0 : oom_buffer_bytes: 100 * MiB,
130 0 : memory_high_buffer_bytes: 100 * MiB,
131 0 : // while waiting for upscale, don't freeze for more than 20ms every 1s
132 0 : max_upscale_wait: Duration::from_millis(20),
133 0 : do_not_freeze_more_often_than: Duration::from_millis(1000),
134 0 : // while waiting for upscale, increase memory.high by 10MiB every 25ms
135 0 : memory_high_increase_by_bytes: 10 * MiB,
136 0 : memory_high_increase_every: Duration::from_millis(25),
137 0 : }
138 0 : }
139 : }
140 :
141 : /// Used to represent data that is associated with a certain point in time, such
142 : /// as an upscale request or memory.high event.
143 : ///
144 : /// Internally, creating a `Sequenced` uses a static atomic counter to obtain
145 : /// a unique sequence number. Sequence numbers are monotonically increasing,
146 : /// allowing us to answer questions like "did this upscale happen after this
147 : /// memory.high event?" by comparing the sequence numbers of the two events.
148 0 : #[derive(Debug, Clone)]
149 : pub struct Sequenced<T> {
150 : seqnum: u64,
151 : data: T,
152 : }
153 :
154 : impl<T> Sequenced<T> {
155 0 : pub fn new(data: T) -> Self {
156 0 : Self {
157 0 : seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel),
158 0 : data,
159 0 : }
160 0 : }
161 : }
162 :
163 : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
164 : /// OOM killed or throttling.
165 : ///
166 : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
167 : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
168 : /// cgroup happy.
169 0 : #[derive(Debug)]
170 : pub struct CgroupWatcher {
171 : pub config: Config,
172 :
173 : /// The sequence number of the last upscale.
174 : ///
175 : /// If we receive a memory.high event that has a _lower_ sequence number than
176 : /// `last_upscale_seqnum`, then we know it occured before the upscale, and we
177 : /// can safely ignore it.
178 : ///
179 : /// Note: Like the `events` field, this doesn't _need_ interior mutability but we
180 : /// use it anyways so that methods take `&self`, not `&mut self`.
181 : last_upscale_seqnum: AtomicU64,
182 :
183 : /// A channel on which we send messages to request upscale from the dispatcher.
184 : upscale_requester: mpsc::Sender<()>,
185 :
186 : /// The actual cgroup we are watching and managing.
187 : cgroup: cgroups_rs::Cgroup,
188 : }
189 :
190 : /// Read memory.events for the desired event type.
191 : ///
192 : /// `path` specifies the path to the desired `memory.events` file.
193 : /// For more info, see the `memory.events` section of the [kernel docs]
194 : /// <https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files>
195 0 : fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result<u64> {
196 0 : let contents = fs::read_to_string(path)
197 0 : .with_context(|| format!("failed to read memory.events from {path}"))?;
198 :
199 : // Then contents of the file look like:
200 : // low 42
201 : // high 101
202 : // ...
203 0 : contents
204 0 : .lines()
205 0 : .filter_map(|s| s.split_once(' '))
206 0 : .find(|(e, _)| *e == event.as_str())
207 0 : .ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}"))
208 0 : .and_then(|(_, count)| {
209 0 : count
210 0 : .parse::<u64>()
211 0 : .with_context(|| format!("failed to parse memory.{event} as u64"))
212 0 : })
213 0 : }
214 :
215 : /// Create an event stream that produces events whenever the file at the provided
216 : /// path is modified.
217 0 : fn create_file_watcher(path: &str) -> anyhow::Result<EventStream<[u8; 1024]>> {
218 0 : info!("creating file watcher for {path}");
219 0 : let inotify = Inotify::init().context("failed to initialize file watcher")?;
220 0 : inotify
221 0 : .watches()
222 0 : .add(path, WatchMask::MODIFY)
223 0 : .with_context(|| format!("failed to start watching {path}"))?;
224 0 : inotify
225 0 : // The inotify docs use [0u8; 1024] so we'll just copy them. We only need
226 0 : // to store one event at a time - if the event gets written over, that's
227 0 : // ok. We still see that there is an event. For more information, see:
228 0 : // https://man7.org/linux/man-pages/man7/inotify.7.html
229 0 : .into_event_stream([0u8; 1024])
230 0 : .context("failed to start inotify event stream")
231 0 : }
232 :
233 : impl CgroupWatcher {
234 : /// Create a new `CgroupWatcher`.
235 0 : #[tracing::instrument(skip_all, fields(%name))]
236 : pub fn new(
237 : name: String,
238 : // A channel on which to send upscale requests
239 : upscale_requester: mpsc::Sender<()>,
240 : ) -> anyhow::Result<(Self, impl Stream<Item = Sequenced<u64>>)> {
241 : // TODO: clarify exactly why we need v2
242 : // Make sure cgroups v2 (aka unified) are supported
243 : if !is_cgroup2_unified_mode() {
244 : anyhow::bail!("cgroups v2 not supported");
245 : }
246 : let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
247 :
248 : // Start monitoring the cgroup for memory events. In general, for
249 : // cgroups v2 (aka unified), metrics are reported in files like
250 : // > `/sys/fs/cgroup/{name}/{metric}`
251 : // We are looking for `memory.high` events, which are stored in the
252 : // file `memory.events`. For more info, see the `memory.events` section
253 : // of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files
254 : let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name);
255 : let memory_events = create_file_watcher(&path)
256 0 : .with_context(|| format!("failed to create event watcher for {path}"))?
257 : // This would be nice with with .inspect_err followed by .ok
258 0 : .filter_map(move |_| match get_event_count(&path, MemoryEvent::High) {
259 0 : Ok(high) => Some(high),
260 0 : Err(error) => {
261 0 : // TODO: Might want to just panic here
262 0 : warn!(?error, "failed to read high events count from {}", &path);
263 0 : None
264 : }
265 0 : })
266 : // Only report the event if the memory.high count increased
267 0 : .filter_map(|high| {
268 0 : if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high {
269 0 : Some(high)
270 : } else {
271 0 : None
272 : }
273 0 : })
274 : .map(Sequenced::new);
275 :
276 : let initial_count = get_event_count(
277 : &format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name),
278 : MemoryEvent::High,
279 : )?;
280 :
281 0 : info!(initial_count, "initial memory.high event count");
282 :
283 : // Hard update `MEMORY_EVENT_COUNT` since there could have been processes
284 : // running in the cgroup before that caused it to be non-zero.
285 : MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel);
286 :
287 : Ok((
288 : Self {
289 : cgroup,
290 : upscale_requester,
291 : last_upscale_seqnum: AtomicU64::new(0),
292 : config: Default::default(),
293 : },
294 : memory_events,
295 : ))
296 : }
297 :
298 : /// The entrypoint for the `CgroupWatcher`.
299 0 : #[tracing::instrument(skip_all)]
300 : pub async fn watch<E>(
301 : &self,
302 : // These are ~dependency injected~ (fancy, I know) because this function
303 : // should never return.
304 : // -> therefore: when we tokio::spawn it, we don't await the JoinHandle.
305 : // -> therefore: if we want to stick it in an Arc so many threads can access
306 : // it, methods can never take mutable access.
307 : // - note: we use the Arc strategy so that a) we can call this function
308 : // right here and b) the runner can call the set/get_memory methods
309 : // -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self,
310 : // we just pass them in here instead of holding them in fields, as that
311 : // would require this method to take &mut self.
312 : mut upscales: mpsc::Receiver<Sequenced<Resources>>,
313 : events: E,
314 : ) -> anyhow::Result<()>
315 : where
316 : E: Stream<Item = Sequenced<u64>>,
317 : {
318 : // There are several actions might do when receiving a `memory.high`,
319 : // such as freezing the cgroup, or increasing its `memory.high`. We don't
320 : // want to do these things too often (because postgres needs to run, and
321 : // we only have so much memory). These timers serve as rate limits for this.
322 : let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO));
323 : let mut wait_to_increase_memory_high = pin!(tokio::time::sleep(Duration::ZERO));
324 : let mut events = pin!(events);
325 :
326 : // Are we waiting to be upscaled? Could be true if we request upscale due
327 : // to a memory.high event and it does not arrive in time.
328 : let mut waiting_on_upscale = false;
329 :
330 : loop {
331 0 : tokio::select! {
332 0 : upscale = upscales.recv() => {
333 : let Sequenced { seqnum, data } = upscale
334 : .context("failed to listen on upscale notification channel")?;
335 : self.last_upscale_seqnum.store(seqnum, Ordering::Release);
336 0 : info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
337 : }
338 0 : event = events.next() => {
339 : let Some(Sequenced { seqnum, .. }) = event else {
340 : bail!("failed to listen for memory.high events")
341 : };
342 : // The memory.high came before our last upscale, so we consider
343 : // it resolved
344 : if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum {
345 0 : info!(
346 0 : "received memory.high event, but it came before our last upscale -> ignoring it"
347 0 : );
348 : continue;
349 : }
350 :
351 : // The memory.high came after our latest upscale. We don't
352 : // want to do anything yet, so peek the next event in hopes
353 : // that it's an upscale.
354 : if let Some(upscale_num) = self
355 : .upscaled(&mut upscales)
356 : .context("failed to check if we were upscaled")?
357 : {
358 : if upscale_num > seqnum {
359 0 : info!(
360 0 : "received memory.high event, but it came before our last upscale -> ignoring it"
361 0 : );
362 : continue;
363 : }
364 : }
365 :
366 : // If it's been long enough since we last froze, freeze the
367 : // cgroup and request upscale
368 : if wait_to_freeze.is_elapsed() {
369 0 : info!("received memory.high event -> requesting upscale");
370 : waiting_on_upscale = self
371 : .handle_memory_high_event(&mut upscales)
372 : .await
373 : .context("failed to handle upscale")?;
374 : wait_to_freeze
375 : .as_mut()
376 : .reset(Instant::now() + self.config.do_not_freeze_more_often_than);
377 : continue;
378 : }
379 :
380 : // Ok, we can't freeze, just request upscale
381 : if !waiting_on_upscale {
382 0 : info!("received memory.high event, but too soon to refreeze -> requesting upscale");
383 :
384 : // Make check to make sure we haven't been upscaled in the
385 : // meantine (can happen if the agent independently decides
386 : // to upscale us again)
387 : if self
388 : .upscaled(&mut upscales)
389 : .context("failed to check if we were upscaled")?
390 : .is_some()
391 : {
392 0 : info!("no need to request upscaling because we got upscaled");
393 : continue;
394 : }
395 : self.upscale_requester
396 : .send(())
397 : .await
398 : .context("failed to request upscale")?;
399 : continue;
400 : }
401 :
402 : // Shoot, we can't freeze or and we're still waiting on upscale,
403 : // increase memory.high to reduce throttling
404 : if wait_to_increase_memory_high.is_elapsed() {
405 0 : info!(
406 0 : "received memory.high event, \
407 0 : but too soon to refreeze and already requested upscale \
408 0 : -> increasing memory.high"
409 0 : );
410 :
411 : // Make check to make sure we haven't been upscaled in the
412 : // meantine (can happen if the agent independently decides
413 : // to upscale us again)
414 : if self
415 : .upscaled(&mut upscales)
416 : .context("failed to check if we were upscaled")?
417 : .is_some()
418 : {
419 0 : info!("no need to increase memory.high because got upscaled");
420 : continue;
421 : }
422 :
423 : // Request upscale anyways (the agent will handle deduplicating
424 : // requests)
425 : self.upscale_requester
426 : .send(())
427 : .await
428 : .context("failed to request upscale")?;
429 :
430 : let memory_high =
431 : self.get_high_bytes().context("failed to get memory.high")?;
432 : let new_high = memory_high + self.config.memory_high_increase_by_bytes;
433 0 : info!(
434 0 : current_high_bytes = memory_high,
435 0 : new_high_bytes = new_high,
436 0 : "updating memory.high"
437 0 : );
438 : self.set_high_bytes(new_high)
439 : .context("failed to set memory.high")?;
440 : wait_to_increase_memory_high
441 : .as_mut()
442 : .reset(Instant::now() + self.config.memory_high_increase_every)
443 : }
444 :
445 : // we can't do anything
446 : }
447 : };
448 : }
449 : }
450 :
451 : /// Handle a `memory.high`, returning whether we are still waiting on upscale
452 : /// by the time the function returns.
453 : ///
454 : /// The general plan for handling a `memory.high` event is as follows:
455 : /// 1. Freeze the cgroup
456 : /// 2. Start a timer for `self.config.max_upscale_wait`
457 : /// 3. Request upscale
458 : /// 4. After the timer elapses or we receive upscale, thaw the cgroup.
459 : /// 5. Return whether or not we are still waiting for upscale. If we are,
460 : /// we'll increase the cgroups memory.high to avoid getting oom killed
461 0 : #[tracing::instrument(skip_all)]
462 : async fn handle_memory_high_event(
463 : &self,
464 : upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
465 : ) -> anyhow::Result<bool> {
466 : // Immediately freeze the cgroup before doing anything else.
467 0 : info!("received memory.high event -> freezing cgroup");
468 : self.freeze().context("failed to freeze cgroup")?;
469 :
470 : // We'll use this for logging durations
471 : let start_time = Instant::now();
472 :
473 : // Await the upscale until we have to unfreeze
474 : let timed =
475 : tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales));
476 :
477 : // Request the upscale
478 0 : info!(
479 0 : wait = ?self.config.max_upscale_wait,
480 0 : "sending request for immediate upscaling",
481 0 : );
482 : self.upscale_requester
483 : .send(())
484 : .await
485 : .context("failed to request upscale")?;
486 :
487 : let waiting_on_upscale = match timed.await {
488 : Ok(Ok(())) => {
489 0 : info!(elapsed = ?start_time.elapsed(), "received upscale in time");
490 : false
491 : }
492 : // **important**: unfreeze the cgroup before ?-reporting the error
493 : Ok(Err(e)) => {
494 0 : info!("error waiting for upscale -> thawing cgroup");
495 : self.thaw()
496 : .context("failed to thaw cgroup after errored waiting for upscale")?;
497 : Err(e.context("failed to await upscale"))?
498 : }
499 : Err(_) => {
500 0 : info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale");
501 : true
502 : }
503 : };
504 :
505 0 : info!("thawing cgroup");
506 : self.thaw().context("failed to thaw cgroup")?;
507 :
508 : Ok(waiting_on_upscale)
509 : }
510 :
511 : /// Checks whether we were just upscaled, returning the upscale's sequence
512 : /// number if so.
513 0 : #[tracing::instrument(skip_all)]
514 : fn upscaled(
515 : &self,
516 : upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
517 : ) -> anyhow::Result<Option<u64>> {
518 : let Sequenced { seqnum, data } = match upscales.try_recv() {
519 : Ok(upscale) => upscale,
520 : Err(TryRecvError::Empty) => return Ok(None),
521 : Err(TryRecvError::Disconnected) => {
522 : bail!("upscale notification channel was disconnected")
523 : }
524 : };
525 :
526 : // Make sure to update the last upscale sequence number
527 : self.last_upscale_seqnum.store(seqnum, Ordering::Release);
528 0 : info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
529 : Ok(Some(seqnum))
530 : }
531 :
532 : /// Await an upscale event, discarding any `memory.high` events received in
533 : /// the process.
534 : ///
535 : /// This is used in `handle_memory_high_event`, where we need to listen
536 : /// for upscales in particular so we know if we can thaw the cgroup early.
537 0 : #[tracing::instrument(skip_all)]
538 : async fn await_upscale(
539 : &self,
540 : upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
541 : ) -> anyhow::Result<()> {
542 : let Sequenced { seqnum, .. } = upscales
543 : .recv()
544 : .await
545 : .context("error listening for upscales")?;
546 :
547 : self.last_upscale_seqnum.store(seqnum, Ordering::Release);
548 : Ok(())
549 : }
550 :
551 : /// Get the cgroup's name.
552 0 : pub fn path(&self) -> &str {
553 0 : self.cgroup.path()
554 0 : }
555 : }
556 :
557 : /// Represents a set of limits we apply to a cgroup to control memory usage.
558 : ///
559 : /// Setting these values also affects the thresholds for receiving usage alerts.
560 0 : #[derive(Debug)]
561 : pub struct MemoryLimits {
562 : high: u64,
563 : max: u64,
564 : }
565 :
566 : impl MemoryLimits {
567 0 : pub fn new(high: u64, max: u64) -> Self {
568 0 : Self { max, high }
569 0 : }
570 : }
571 :
572 : // Methods for manipulating the actual cgroup
573 : impl CgroupWatcher {
574 : /// Get a handle on the freezer subsystem.
575 : fn freezer(&self) -> anyhow::Result<&FreezerController> {
576 0 : if let Some(Freezer(freezer)) = self
577 0 : .cgroup
578 0 : .subsystems()
579 0 : .iter()
580 0 : .find(|sub| matches!(sub, Freezer(_)))
581 : {
582 0 : Ok(freezer)
583 : } else {
584 0 : anyhow::bail!("could not find freezer subsystem")
585 : }
586 0 : }
587 :
588 : /// Attempt to freeze the cgroup.
589 0 : pub fn freeze(&self) -> anyhow::Result<()> {
590 0 : self.freezer()
591 0 : .context("failed to get freezer subsystem")?
592 0 : .freeze()
593 0 : .context("failed to freeze")
594 0 : }
595 :
596 : /// Attempt to thaw the cgroup.
597 0 : pub fn thaw(&self) -> anyhow::Result<()> {
598 0 : self.freezer()
599 0 : .context("failed to get freezer subsystem")?
600 0 : .thaw()
601 0 : .context("failed to thaw")
602 0 : }
603 :
604 : /// Get a handle on the memory subsystem.
605 : ///
606 : /// Note: this method does not require `self.memory_update_lock` because
607 : /// getting a handle to the subsystem does not access any of the files we
608 : /// care about, such as memory.high and memory.events
609 : fn memory(&self) -> anyhow::Result<&MemController> {
610 0 : if let Some(Mem(memory)) = self
611 0 : .cgroup
612 0 : .subsystems()
613 0 : .iter()
614 0 : .find(|sub| matches!(sub, Mem(_)))
615 : {
616 0 : Ok(memory)
617 : } else {
618 0 : anyhow::bail!("could not find memory subsystem")
619 : }
620 0 : }
621 :
622 : /// Get cgroup current memory usage.
623 0 : pub fn current_memory_usage(&self) -> anyhow::Result<u64> {
624 0 : Ok(self
625 0 : .memory()
626 0 : .context("failed to get memory subsystem")?
627 0 : .memory_stat()
628 : .usage_in_bytes)
629 0 : }
630 :
631 : /// Set cgroup memory.high threshold.
632 0 : pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
633 0 : self.memory()
634 0 : .context("failed to get memory subsystem")?
635 0 : .set_mem(cgroups_rs::memory::SetMemory {
636 0 : low: None,
637 0 : high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
638 0 : min: None,
639 0 : max: None,
640 0 : })
641 0 : .context("failed to set memory.high")
642 0 : }
643 :
644 : /// Set cgroup memory.high and memory.max.
645 0 : pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
646 0 : info!(
647 0 : limits.high,
648 0 : limits.max,
649 0 : path = self.path(),
650 0 : "writing new memory limits",
651 0 : );
652 0 : self.memory()
653 0 : .context("failed to get memory subsystem while setting memory limits")?
654 0 : .set_mem(cgroups_rs::memory::SetMemory {
655 0 : min: None,
656 0 : low: None,
657 0 : high: Some(MaxValue::Value(
658 0 : u64::min(limits.high, i64::MAX as u64) as i64
659 0 : )),
660 0 : max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
661 0 : })
662 0 : .context("failed to set memory limits")
663 0 : }
664 :
665 : /// Given some amount of available memory, set the desired cgroup memory limits
666 0 : pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
667 0 : let new_high = self.config.calculate_memory_high_value(available_memory);
668 0 : let limits = MemoryLimits::new(new_high, available_memory);
669 0 : info!(
670 0 : path = self.path(),
671 0 : memory = ?limits,
672 0 : "setting cgroup memory",
673 0 : );
674 0 : self.set_limits(&limits)
675 0 : .context("failed to set cgroup memory limits")?;
676 0 : Ok(())
677 0 : }
678 :
679 : /// Get memory.high threshold.
680 0 : pub fn get_high_bytes(&self) -> anyhow::Result<u64> {
681 0 : let high = self
682 0 : .memory()
683 0 : .context("failed to get memory subsystem while getting memory statistics")?
684 0 : .get_mem()
685 0 : .map(|mem| mem.high)
686 0 : .context("failed to get memory statistics from subsystem")?;
687 0 : match high {
688 0 : Some(MaxValue::Max) => Ok(i64::MAX as u64),
689 0 : Some(MaxValue::Value(high)) => Ok(high as u64),
690 0 : None => anyhow::bail!("failed to read memory.high from memory subsystem"),
691 : }
692 0 : }
693 : }
|