LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - cgroup.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 214 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 89 0

            Line data    Source code
       1              : use std::{
       2              :     fmt::{Debug, Display},
       3              :     fs,
       4              :     pin::pin,
       5              :     sync::atomic::{AtomicU64, Ordering},
       6              : };
       7              : 
       8              : use anyhow::{anyhow, bail, Context};
       9              : use cgroups_rs::{
      10              :     freezer::FreezerController,
      11              :     hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT},
      12              :     memory::MemController,
      13              :     MaxValue,
      14              :     Subsystem::{Freezer, Mem},
      15              : };
      16              : use inotify::{EventStream, Inotify, WatchMask};
      17              : use tokio::sync::mpsc::{self, error::TryRecvError};
      18              : use tokio::time::{Duration, Instant};
      19              : use tokio_stream::{Stream, StreamExt};
      20              : use tracing::{info, warn};
      21              : 
      22              : use crate::protocol::Resources;
      23              : use crate::MiB;
      24              : 
      25              : /// Monotonically increasing counter of the number of memory.high events
      26              : /// the cgroup has experienced.
      27              : ///
      28              : /// We use this to determine if a modification to the `memory.events` file actually
      29              : /// changed the `high` field. If not, we don't care about the change. When we
      30              : /// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT`
      31              : /// to see if it changed since last time.
      32              : pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
      33              : 
      34              : /// Monotonically increasing counter that gives each cgroup event a unique id.
      35              : ///
      36              : /// This allows us to answer questions like "did this upscale arrive before this
      37              : /// memory.high?". This static is also used by the `Sequenced` type to "tag" values
      38              : /// with a sequence number. As such, prefer to used the `Sequenced` type rather
      39              : /// than this static directly.
      40              : static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0);
      41              : 
      42              : /// A memory event type reported in memory.events.
      43            0 : #[derive(Debug, Eq, PartialEq, Copy, Clone)]
      44              : pub enum MemoryEvent {
      45              :     Low,
      46              :     High,
      47              :     Max,
      48              :     Oom,
      49              :     OomKill,
      50              :     OomGroupKill,
      51              : }
      52              : 
      53              : impl MemoryEvent {
      54            0 :     fn as_str(&self) -> &str {
      55            0 :         match self {
      56            0 :             MemoryEvent::Low => "low",
      57            0 :             MemoryEvent::High => "high",
      58            0 :             MemoryEvent::Max => "max",
      59            0 :             MemoryEvent::Oom => "oom",
      60            0 :             MemoryEvent::OomKill => "oom_kill",
      61            0 :             MemoryEvent::OomGroupKill => "oom_group_kill",
      62              :         }
      63            0 :     }
      64              : }
      65              : 
      66              : impl Display for MemoryEvent {
      67            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      68            0 :         f.write_str(self.as_str())
      69            0 :     }
      70              : }
      71              : 
      72              : /// Configuration for a `CgroupWatcher`
      73            0 : #[derive(Debug, Clone)]
      74              : pub struct Config {
      75              :     // The target difference between the total memory reserved for the cgroup
      76              :     // and the value of the cgroup's memory.high.
      77              :     //
      78              :     // In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may
      79              :     // use (equal to system memory, minus whatever's taken out for the file cache).
      80              :     oom_buffer_bytes: u64,
      81              : 
      82              :     // The amount of memory, in bytes, below a proposed new value for
      83              :     // memory.high that the cgroup's memory usage must be for us to downscale
      84              :     //
      85              :     // In other words, we can downscale only when:
      86              :     //
      87              :     //   memory.current + memory_high_buffer_bytes < (proposed) memory.high
      88              :     //
      89              :     // TODO: there's some minor issues with this approach -- in particular, that we might have
      90              :     // memory in use by the kernel's page cache that we're actually ok with getting rid of.
      91              :     pub(crate) memory_high_buffer_bytes: u64,
      92              : 
      93              :     // The maximum duration, in milliseconds, that we're allowed to pause
      94              :     // the cgroup for while waiting for the autoscaler-agent to upscale us
      95              :     max_upscale_wait: Duration,
      96              : 
      97              :     // The required minimum time, in milliseconds, that we must wait before re-freezing
      98              :     // the cgroup while waiting for the autoscaler-agent to upscale us.
      99              :     do_not_freeze_more_often_than: Duration,
     100              : 
     101              :     // The amount of memory, in bytes, that we should periodically increase memory.high
     102              :     // by while waiting for the autoscaler-agent to upscale us.
     103              :     //
     104              :     // This exists to avoid the excessive throttling that happens when a cgroup is above its
     105              :     // memory.high for too long. See more here:
     106              :     // https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217
     107              :     memory_high_increase_by_bytes: u64,
     108              : 
     109              :     // The period, in milliseconds, at which we should repeatedly increase the value
     110              :     // of the cgroup's memory.high while we're waiting on upscaling and memory.high
     111              :     // is still being hit.
     112              :     //
     113              :     // Technically speaking, this actually serves as a rate limit to moderate responding to
     114              :     // memory.high events, but these are roughly equivalent if the process is still allocating
     115              :     // memory.
     116              :     memory_high_increase_every: Duration,
     117              : }
     118              : 
     119              : impl Config {
     120              :     /// Calculate the new value for the cgroups memory.high based on system memory
     121            0 :     pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 {
     122            0 :         total_system_mem.saturating_sub(self.oom_buffer_bytes)
     123            0 :     }
     124              : }
     125              : 
     126              : impl Default for Config {
     127            0 :     fn default() -> Self {
     128            0 :         Self {
     129            0 :             oom_buffer_bytes: 100 * MiB,
     130            0 :             memory_high_buffer_bytes: 100 * MiB,
     131            0 :             // while waiting for upscale, don't freeze for more than 20ms every 1s
     132            0 :             max_upscale_wait: Duration::from_millis(20),
     133            0 :             do_not_freeze_more_often_than: Duration::from_millis(1000),
     134            0 :             // while waiting for upscale, increase memory.high by 10MiB every 25ms
     135            0 :             memory_high_increase_by_bytes: 10 * MiB,
     136            0 :             memory_high_increase_every: Duration::from_millis(25),
     137            0 :         }
     138            0 :     }
     139              : }
     140              : 
     141              : /// Used to represent data that is associated with a certain point in time, such
     142              : /// as an upscale request or memory.high event.
     143              : ///
     144              : /// Internally, creating a `Sequenced` uses a static atomic counter to obtain
     145              : /// a unique sequence number. Sequence numbers are monotonically increasing,
     146              : /// allowing us to answer questions like "did this upscale happen after this
     147              : /// memory.high event?" by comparing the sequence numbers of the two events.
     148            0 : #[derive(Debug, Clone)]
     149              : pub struct Sequenced<T> {
     150              :     seqnum: u64,
     151              :     data: T,
     152              : }
     153              : 
     154              : impl<T> Sequenced<T> {
     155            0 :     pub fn new(data: T) -> Self {
     156            0 :         Self {
     157            0 :             seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel),
     158            0 :             data,
     159            0 :         }
     160            0 :     }
     161              : }
     162              : 
     163              : /// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
     164              : /// OOM killed or throttling.
     165              : ///
     166              : /// The `CgroupWatcher` primarily achieves this by reading from a stream of
     167              : /// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
     168              : /// cgroup happy.
     169            0 : #[derive(Debug)]
     170              : pub struct CgroupWatcher {
     171              :     pub config: Config,
     172              : 
     173              :     /// The sequence number of the last upscale.
     174              :     ///
     175              :     /// If we receive a memory.high event that has a _lower_ sequence number than
     176              :     /// `last_upscale_seqnum`, then we know it occured before the upscale, and we
     177              :     /// can safely ignore it.
     178              :     ///
     179              :     /// Note: Like the `events` field, this doesn't _need_ interior mutability but we
     180              :     /// use it anyways so that methods take `&self`, not `&mut self`.
     181              :     last_upscale_seqnum: AtomicU64,
     182              : 
     183              :     /// A channel on which we send messages to request upscale from the dispatcher.
     184              :     upscale_requester: mpsc::Sender<()>,
     185              : 
     186              :     /// The actual cgroup we are watching and managing.
     187              :     cgroup: cgroups_rs::Cgroup,
     188              : }
     189              : 
     190              : /// Read memory.events for the desired event type.
     191              : ///
     192              : /// `path` specifies the path to the desired `memory.events` file.
     193              : /// For more info, see the `memory.events` section of the [kernel docs]
     194              : /// <https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files>
     195            0 : fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result<u64> {
     196            0 :     let contents = fs::read_to_string(path)
     197            0 :         .with_context(|| format!("failed to read memory.events from {path}"))?;
     198              : 
     199              :     // Then contents of the file look like:
     200              :     // low 42
     201              :     // high 101
     202              :     // ...
     203            0 :     contents
     204            0 :         .lines()
     205            0 :         .filter_map(|s| s.split_once(' '))
     206            0 :         .find(|(e, _)| *e == event.as_str())
     207            0 :         .ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}"))
     208            0 :         .and_then(|(_, count)| {
     209            0 :             count
     210            0 :                 .parse::<u64>()
     211            0 :                 .with_context(|| format!("failed to parse memory.{event} as u64"))
     212            0 :         })
     213            0 : }
     214              : 
     215              : /// Create an event stream that produces events whenever the file at the provided
     216              : /// path is modified.
     217            0 : fn create_file_watcher(path: &str) -> anyhow::Result<EventStream<[u8; 1024]>> {
     218            0 :     info!("creating file watcher for {path}");
     219            0 :     let inotify = Inotify::init().context("failed to initialize file watcher")?;
     220            0 :     inotify
     221            0 :         .watches()
     222            0 :         .add(path, WatchMask::MODIFY)
     223            0 :         .with_context(|| format!("failed to start watching {path}"))?;
     224            0 :     inotify
     225            0 :         // The inotify docs use [0u8; 1024] so we'll just copy them. We only need
     226            0 :         // to store one event at a time - if the event gets written over, that's
     227            0 :         // ok. We still see that there is an event. For more information, see:
     228            0 :         // https://man7.org/linux/man-pages/man7/inotify.7.html
     229            0 :         .into_event_stream([0u8; 1024])
     230            0 :         .context("failed to start inotify event stream")
     231            0 : }
     232              : 
     233              : impl CgroupWatcher {
     234              :     /// Create a new `CgroupWatcher`.
     235            0 :     #[tracing::instrument(skip_all, fields(%name))]
     236              :     pub fn new(
     237              :         name: String,
     238              :         // A channel on which to send upscale requests
     239              :         upscale_requester: mpsc::Sender<()>,
     240              :     ) -> anyhow::Result<(Self, impl Stream<Item = Sequenced<u64>>)> {
     241              :         // TODO: clarify exactly why we need v2
     242              :         // Make sure cgroups v2 (aka unified) are supported
     243              :         if !is_cgroup2_unified_mode() {
     244              :             anyhow::bail!("cgroups v2 not supported");
     245              :         }
     246              :         let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
     247              : 
     248              :         // Start monitoring the cgroup for memory events. In general, for
     249              :         // cgroups v2 (aka unified), metrics are reported in files like
     250              :         // > `/sys/fs/cgroup/{name}/{metric}`
     251              :         // We are looking for `memory.high` events, which are stored in the
     252              :         // file `memory.events`. For more info, see the `memory.events` section
     253              :         // of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files
     254              :         let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name);
     255              :         let memory_events = create_file_watcher(&path)
     256            0 :             .with_context(|| format!("failed to create event watcher for {path}"))?
     257              :             // This would be nice with with .inspect_err followed by .ok
     258            0 :             .filter_map(move |_| match get_event_count(&path, MemoryEvent::High) {
     259            0 :                 Ok(high) => Some(high),
     260            0 :                 Err(error) => {
     261            0 :                     // TODO: Might want to just panic here
     262            0 :                     warn!(?error, "failed to read high events count from {}", &path);
     263            0 :                     None
     264              :                 }
     265            0 :             })
     266              :             // Only report the event if the memory.high count increased
     267            0 :             .filter_map(|high| {
     268            0 :                 if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high {
     269            0 :                     Some(high)
     270              :                 } else {
     271            0 :                     None
     272              :                 }
     273            0 :             })
     274              :             .map(Sequenced::new);
     275              : 
     276              :         let initial_count = get_event_count(
     277              :             &format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name),
     278              :             MemoryEvent::High,
     279              :         )?;
     280              : 
     281            0 :         info!(initial_count, "initial memory.high event count");
     282              : 
     283              :         // Hard update `MEMORY_EVENT_COUNT` since there could have been processes
     284              :         // running in the cgroup before that caused it to be non-zero.
     285              :         MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel);
     286              : 
     287              :         Ok((
     288              :             Self {
     289              :                 cgroup,
     290              :                 upscale_requester,
     291              :                 last_upscale_seqnum: AtomicU64::new(0),
     292              :                 config: Default::default(),
     293              :             },
     294              :             memory_events,
     295              :         ))
     296              :     }
     297              : 
     298              :     /// The entrypoint for the `CgroupWatcher`.
     299            0 :     #[tracing::instrument(skip_all)]
     300              :     pub async fn watch<E>(
     301              :         &self,
     302              :         // These are ~dependency injected~ (fancy, I know) because this function
     303              :         // should never return.
     304              :         // -> therefore: when we tokio::spawn it, we don't await the JoinHandle.
     305              :         // -> therefore: if we want to stick it in an Arc so many threads can access
     306              :         //    it, methods can never take mutable access.
     307              :         //     - note: we use the Arc strategy so that a) we can call this function
     308              :         //             right here and b) the runner can call the set/get_memory methods
     309              :         // -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self,
     310              :         //    we just pass them in here instead of holding them in fields, as that
     311              :         //    would require this method to take &mut self.
     312              :         mut upscales: mpsc::Receiver<Sequenced<Resources>>,
     313              :         events: E,
     314              :     ) -> anyhow::Result<()>
     315              :     where
     316              :         E: Stream<Item = Sequenced<u64>>,
     317              :     {
     318              :         // There are several actions might do when receiving a `memory.high`,
     319              :         // such as freezing the cgroup, or increasing its `memory.high`. We don't
     320              :         // want to do these things too often (because postgres needs to run, and
     321              :         // we only have so much memory). These timers serve as rate limits for this.
     322              :         let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO));
     323              :         let mut wait_to_increase_memory_high = pin!(tokio::time::sleep(Duration::ZERO));
     324              :         let mut events = pin!(events);
     325              : 
     326              :         // Are we waiting to be upscaled? Could be true if we request upscale due
     327              :         // to a memory.high event and it does not arrive in time.
     328              :         let mut waiting_on_upscale = false;
     329              : 
     330              :         loop {
     331            0 :             tokio::select! {
     332            0 :                 upscale = upscales.recv() => {
     333              :                     let Sequenced { seqnum, data } = upscale
     334              :                         .context("failed to listen on upscale notification channel")?;
     335              :                     self.last_upscale_seqnum.store(seqnum, Ordering::Release);
     336            0 :                     info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
     337              :                 }
     338            0 :                 event = events.next() => {
     339              :                     let Some(Sequenced { seqnum, .. }) = event else {
     340              :                         bail!("failed to listen for memory.high events")
     341              :                     };
     342              :                     // The memory.high came before our last upscale, so we consider
     343              :                     // it resolved
     344              :                     if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum {
     345            0 :                         info!(
     346            0 :                             "received memory.high event, but it came before our last upscale -> ignoring it"
     347            0 :                         );
     348              :                         continue;
     349              :                     }
     350              : 
     351              :                     // The memory.high came after our latest upscale. We don't
     352              :                     // want to do anything yet, so peek the next event in hopes
     353              :                     // that it's an upscale.
     354              :                     if let Some(upscale_num) = self
     355              :                         .upscaled(&mut upscales)
     356              :                         .context("failed to check if we were upscaled")?
     357              :                     {
     358              :                         if upscale_num > seqnum {
     359            0 :                             info!(
     360            0 :                                 "received memory.high event, but it came before our last upscale -> ignoring it"
     361            0 :                             );
     362              :                             continue;
     363              :                         }
     364              :                     }
     365              : 
     366              :                     // If it's been long enough since we last froze, freeze the
     367              :                     // cgroup and request upscale
     368              :                     if wait_to_freeze.is_elapsed() {
     369            0 :                         info!("received memory.high event -> requesting upscale");
     370              :                         waiting_on_upscale = self
     371              :                             .handle_memory_high_event(&mut upscales)
     372              :                             .await
     373              :                             .context("failed to handle upscale")?;
     374              :                         wait_to_freeze
     375              :                             .as_mut()
     376              :                             .reset(Instant::now() + self.config.do_not_freeze_more_often_than);
     377              :                         continue;
     378              :                     }
     379              : 
     380              :                     // Ok, we can't freeze, just request upscale
     381              :                     if !waiting_on_upscale {
     382            0 :                         info!("received memory.high event, but too soon to refreeze -> requesting upscale");
     383              : 
     384              :                         // Make check to make sure we haven't been upscaled in the
     385              :                         // meantine (can happen if the agent independently decides
     386              :                         // to upscale us again)
     387              :                         if self
     388              :                             .upscaled(&mut upscales)
     389              :                             .context("failed to check if we were upscaled")?
     390              :                             .is_some()
     391              :                         {
     392            0 :                             info!("no need to request upscaling because we got upscaled");
     393              :                             continue;
     394              :                         }
     395              :                         self.upscale_requester
     396              :                             .send(())
     397              :                             .await
     398              :                             .context("failed to request upscale")?;
     399              :                         continue;
     400              :                     }
     401              : 
     402              :                     // Shoot, we can't freeze or and we're still waiting on upscale,
     403              :                     // increase memory.high to reduce throttling
     404              :                     if wait_to_increase_memory_high.is_elapsed() {
     405            0 :                         info!(
     406            0 :                             "received memory.high event, \
     407            0 :                             but too soon to refreeze and already requested upscale \
     408            0 :                             -> increasing memory.high"
     409            0 :                         );
     410              : 
     411              :                         // Make check to make sure we haven't been upscaled in the
     412              :                         // meantine (can happen if the agent independently decides
     413              :                         // to upscale us again)
     414              :                         if self
     415              :                             .upscaled(&mut upscales)
     416              :                             .context("failed to check if we were upscaled")?
     417              :                             .is_some()
     418              :                         {
     419            0 :                             info!("no need to increase memory.high because got upscaled");
     420              :                             continue;
     421              :                         }
     422              : 
     423              :                         // Request upscale anyways (the agent will handle deduplicating
     424              :                         // requests)
     425              :                         self.upscale_requester
     426              :                             .send(())
     427              :                             .await
     428              :                             .context("failed to request upscale")?;
     429              : 
     430              :                         let memory_high =
     431              :                             self.get_high_bytes().context("failed to get memory.high")?;
     432              :                         let new_high = memory_high + self.config.memory_high_increase_by_bytes;
     433            0 :                         info!(
     434            0 :                             current_high_bytes = memory_high,
     435            0 :                             new_high_bytes = new_high,
     436            0 :                             "updating memory.high"
     437            0 :                         );
     438              :                         self.set_high_bytes(new_high)
     439              :                             .context("failed to set memory.high")?;
     440              :                         wait_to_increase_memory_high
     441              :                             .as_mut()
     442              :                             .reset(Instant::now() + self.config.memory_high_increase_every)
     443              :                     }
     444              : 
     445              :                     // we can't do anything
     446              :                 }
     447              :             };
     448              :         }
     449              :     }
     450              : 
     451              :     /// Handle a `memory.high`, returning whether we are still waiting on upscale
     452              :     /// by the time the function returns.
     453              :     ///
     454              :     /// The general plan for handling a `memory.high` event is as follows:
     455              :     /// 1. Freeze the cgroup
     456              :     /// 2. Start a timer for `self.config.max_upscale_wait`
     457              :     /// 3. Request upscale
     458              :     /// 4. After the timer elapses or we receive upscale, thaw the cgroup.
     459              :     /// 5. Return whether or not we are still waiting for upscale. If we are,
     460              :     ///    we'll increase the cgroups memory.high to avoid getting oom killed
     461            0 :     #[tracing::instrument(skip_all)]
     462              :     async fn handle_memory_high_event(
     463              :         &self,
     464              :         upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
     465              :     ) -> anyhow::Result<bool> {
     466              :         // Immediately freeze the cgroup before doing anything else.
     467            0 :         info!("received memory.high event -> freezing cgroup");
     468              :         self.freeze().context("failed to freeze cgroup")?;
     469              : 
     470              :         // We'll use this for logging durations
     471              :         let start_time = Instant::now();
     472              : 
     473              :         // Await the upscale until we have to unfreeze
     474              :         let timed =
     475              :             tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales));
     476              : 
     477              :         // Request the upscale
     478            0 :         info!(
     479            0 :             wait = ?self.config.max_upscale_wait,
     480            0 :             "sending request for immediate upscaling",
     481            0 :         );
     482              :         self.upscale_requester
     483              :             .send(())
     484              :             .await
     485              :             .context("failed to request upscale")?;
     486              : 
     487              :         let waiting_on_upscale = match timed.await {
     488              :             Ok(Ok(())) => {
     489            0 :                 info!(elapsed = ?start_time.elapsed(), "received upscale in time");
     490              :                 false
     491              :             }
     492              :             // **important**: unfreeze the cgroup before ?-reporting the error
     493              :             Ok(Err(e)) => {
     494            0 :                 info!("error waiting for upscale -> thawing cgroup");
     495              :                 self.thaw()
     496              :                     .context("failed to thaw cgroup after errored waiting for upscale")?;
     497              :                 Err(e.context("failed to await upscale"))?
     498              :             }
     499              :             Err(_) => {
     500            0 :                 info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale");
     501              :                 true
     502              :             }
     503              :         };
     504              : 
     505            0 :         info!("thawing cgroup");
     506              :         self.thaw().context("failed to thaw cgroup")?;
     507              : 
     508              :         Ok(waiting_on_upscale)
     509              :     }
     510              : 
     511              :     /// Checks whether we were just upscaled, returning the upscale's sequence
     512              :     /// number if so.
     513            0 :     #[tracing::instrument(skip_all)]
     514              :     fn upscaled(
     515              :         &self,
     516              :         upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
     517              :     ) -> anyhow::Result<Option<u64>> {
     518              :         let Sequenced { seqnum, data } = match upscales.try_recv() {
     519              :             Ok(upscale) => upscale,
     520              :             Err(TryRecvError::Empty) => return Ok(None),
     521              :             Err(TryRecvError::Disconnected) => {
     522              :                 bail!("upscale notification channel was disconnected")
     523              :             }
     524              :         };
     525              : 
     526              :         // Make sure to update the last upscale sequence number
     527              :         self.last_upscale_seqnum.store(seqnum, Ordering::Release);
     528            0 :         info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
     529              :         Ok(Some(seqnum))
     530              :     }
     531              : 
     532              :     /// Await an upscale event, discarding any `memory.high` events received in
     533              :     /// the process.
     534              :     ///
     535              :     /// This is used in `handle_memory_high_event`, where we need to listen
     536              :     /// for upscales in particular so we know if we can thaw the cgroup early.
     537            0 :     #[tracing::instrument(skip_all)]
     538              :     async fn await_upscale(
     539              :         &self,
     540              :         upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
     541              :     ) -> anyhow::Result<()> {
     542              :         let Sequenced { seqnum, .. } = upscales
     543              :             .recv()
     544              :             .await
     545              :             .context("error listening for upscales")?;
     546              : 
     547              :         self.last_upscale_seqnum.store(seqnum, Ordering::Release);
     548              :         Ok(())
     549              :     }
     550              : 
     551              :     /// Get the cgroup's name.
     552            0 :     pub fn path(&self) -> &str {
     553            0 :         self.cgroup.path()
     554            0 :     }
     555              : }
     556              : 
     557              : /// Represents a set of limits we apply to a cgroup to control memory usage.
     558              : ///
     559              : /// Setting these values also affects the thresholds for receiving usage alerts.
     560            0 : #[derive(Debug)]
     561              : pub struct MemoryLimits {
     562              :     high: u64,
     563              :     max: u64,
     564              : }
     565              : 
     566              : impl MemoryLimits {
     567            0 :     pub fn new(high: u64, max: u64) -> Self {
     568            0 :         Self { max, high }
     569            0 :     }
     570              : }
     571              : 
     572              : // Methods for manipulating the actual cgroup
     573              : impl CgroupWatcher {
     574              :     /// Get a handle on the freezer subsystem.
     575              :     fn freezer(&self) -> anyhow::Result<&FreezerController> {
     576            0 :         if let Some(Freezer(freezer)) = self
     577            0 :             .cgroup
     578            0 :             .subsystems()
     579            0 :             .iter()
     580            0 :             .find(|sub| matches!(sub, Freezer(_)))
     581              :         {
     582            0 :             Ok(freezer)
     583              :         } else {
     584            0 :             anyhow::bail!("could not find freezer subsystem")
     585              :         }
     586            0 :     }
     587              : 
     588              :     /// Attempt to freeze the cgroup.
     589            0 :     pub fn freeze(&self) -> anyhow::Result<()> {
     590            0 :         self.freezer()
     591            0 :             .context("failed to get freezer subsystem")?
     592            0 :             .freeze()
     593            0 :             .context("failed to freeze")
     594            0 :     }
     595              : 
     596              :     /// Attempt to thaw the cgroup.
     597            0 :     pub fn thaw(&self) -> anyhow::Result<()> {
     598            0 :         self.freezer()
     599            0 :             .context("failed to get freezer subsystem")?
     600            0 :             .thaw()
     601            0 :             .context("failed to thaw")
     602            0 :     }
     603              : 
     604              :     /// Get a handle on the memory subsystem.
     605              :     ///
     606              :     /// Note: this method does not require `self.memory_update_lock` because
     607              :     /// getting a handle to the subsystem does not access any of the files we
     608              :     /// care about, such as memory.high and memory.events
     609              :     fn memory(&self) -> anyhow::Result<&MemController> {
     610            0 :         if let Some(Mem(memory)) = self
     611            0 :             .cgroup
     612            0 :             .subsystems()
     613            0 :             .iter()
     614            0 :             .find(|sub| matches!(sub, Mem(_)))
     615              :         {
     616            0 :             Ok(memory)
     617              :         } else {
     618            0 :             anyhow::bail!("could not find memory subsystem")
     619              :         }
     620            0 :     }
     621              : 
     622              :     /// Get cgroup current memory usage.
     623            0 :     pub fn current_memory_usage(&self) -> anyhow::Result<u64> {
     624            0 :         Ok(self
     625            0 :             .memory()
     626            0 :             .context("failed to get memory subsystem")?
     627            0 :             .memory_stat()
     628              :             .usage_in_bytes)
     629            0 :     }
     630              : 
     631              :     /// Set cgroup memory.high threshold.
     632            0 :     pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
     633            0 :         self.memory()
     634            0 :             .context("failed to get memory subsystem")?
     635            0 :             .set_mem(cgroups_rs::memory::SetMemory {
     636            0 :                 low: None,
     637            0 :                 high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
     638            0 :                 min: None,
     639            0 :                 max: None,
     640            0 :             })
     641            0 :             .context("failed to set memory.high")
     642            0 :     }
     643              : 
     644              :     /// Set cgroup memory.high and memory.max.
     645            0 :     pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
     646            0 :         info!(
     647            0 :             limits.high,
     648            0 :             limits.max,
     649            0 :             path = self.path(),
     650            0 :             "writing new memory limits",
     651            0 :         );
     652            0 :         self.memory()
     653            0 :             .context("failed to get memory subsystem while setting memory limits")?
     654            0 :             .set_mem(cgroups_rs::memory::SetMemory {
     655            0 :                 min: None,
     656            0 :                 low: None,
     657            0 :                 high: Some(MaxValue::Value(
     658            0 :                     u64::min(limits.high, i64::MAX as u64) as i64
     659            0 :                 )),
     660            0 :                 max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
     661            0 :             })
     662            0 :             .context("failed to set memory limits")
     663            0 :     }
     664              : 
     665              :     /// Given some amount of available memory, set the desired cgroup memory limits
     666            0 :     pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
     667            0 :         let new_high = self.config.calculate_memory_high_value(available_memory);
     668            0 :         let limits = MemoryLimits::new(new_high, available_memory);
     669            0 :         info!(
     670            0 :             path = self.path(),
     671            0 :             memory = ?limits,
     672            0 :             "setting cgroup memory",
     673            0 :         );
     674            0 :         self.set_limits(&limits)
     675            0 :             .context("failed to set cgroup memory limits")?;
     676            0 :         Ok(())
     677            0 :     }
     678              : 
     679              :     /// Get memory.high threshold.
     680            0 :     pub fn get_high_bytes(&self) -> anyhow::Result<u64> {
     681            0 :         let high = self
     682            0 :             .memory()
     683            0 :             .context("failed to get memory subsystem while getting memory statistics")?
     684            0 :             .get_mem()
     685            0 :             .map(|mem| mem.high)
     686            0 :             .context("failed to get memory statistics from subsystem")?;
     687            0 :         match high {
     688            0 :             Some(MaxValue::Max) => Ok(i64::MAX as u64),
     689            0 :             Some(MaxValue::Value(high)) => Ok(high as u64),
     690            0 :             None => anyhow::bail!("failed to read memory.high from memory subsystem"),
     691              :         }
     692            0 :     }
     693              : }
        

Generated by: LCOV version 2.1-beta