LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - filecache.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 78 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 31 0

            Line data    Source code
       1              : //! Logic for configuring and scaling the Postgres file cache.
       2              : 
       3              : use std::num::NonZeroU64;
       4              : 
       5              : use crate::MiB;
       6              : use anyhow::{anyhow, Context};
       7              : use tokio_postgres::{types::ToSql, Client, NoTls, Row};
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::{error, info};
      10              : 
      11              : /// Manages Postgres' file cache by keeping a connection open.
      12            0 : #[derive(Debug)]
      13              : pub struct FileCacheState {
      14              :     client: Client,
      15              :     conn_str: String,
      16              :     pub(crate) config: FileCacheConfig,
      17              : 
      18              :     /// A token for cancelling spawned threads during shutdown.
      19              :     token: CancellationToken,
      20              : }
      21              : 
      22            0 : #[derive(Debug)]
      23              : pub struct FileCacheConfig {
      24              :     /// Whether the file cache is *actually* stored in memory (e.g. by writing to
      25              :     /// a tmpfs or shmem file). If true, the size of the file cache will be counted against the
      26              :     /// memory available for the cgroup.
      27              :     pub(crate) in_memory: bool,
      28              : 
      29              :     /// The size of the file cache, in terms of the size of the resource it consumes
      30              :     /// (currently: only memory)
      31              :     ///
      32              :     /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
      33              :     /// resources.
      34              :     ///
      35              :     /// This value must be strictly between 0 and 1.
      36              :     resource_multiplier: f64,
      37              : 
      38              :     /// The required minimum amount of memory, in bytes, that must remain available
      39              :     /// after subtracting the file cache.
      40              :     ///
      41              :     /// This value must be non-zero.
      42              :     min_remaining_after_cache: NonZeroU64,
      43              : 
      44              :     /// Controls the rate of increase in the file cache's size as it grows from zero
      45              :     /// (when total resources equals min_remaining_after_cache) to the desired size based on
      46              :     /// `resource_multiplier`.
      47              :     ///
      48              :     /// A `spread_factor` of zero means that all additional resources will go to the cache until it
      49              :     /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
      50              :     /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
      51              :     /// its desired size".
      52              :     ///
      53              :     /// This value must be >= 0, and must retain an increase that is more than what would be given by
      54              :     /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
      55              :     /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
      56              :     /// as desired by `resource_multiplier`.
      57              :     ///
      58              :     /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
      59              :     spread_factor: f64,
      60              : }
      61              : 
      62              : impl FileCacheConfig {
      63            0 :     pub fn default_in_memory() -> Self {
      64            0 :         Self {
      65            0 :             in_memory: true,
      66            0 :             // 75 %
      67            0 :             resource_multiplier: 0.75,
      68            0 :             // 640 MiB; (512 + 128)
      69            0 :             min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(),
      70            0 :             // ensure any increase in file cache size is split 90-10 with 10% to other memory
      71            0 :             spread_factor: 0.1,
      72            0 :         }
      73            0 :     }
      74              : 
      75            0 :     pub fn default_on_disk() -> Self {
      76            0 :         Self {
      77            0 :             in_memory: false,
      78            0 :             resource_multiplier: 0.75,
      79            0 :             // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
      80            0 :             // memory, the kernel will just evict from its page cache, rather than e.g. killing
      81            0 :             // everything.
      82            0 :             min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
      83            0 :             spread_factor: 0.1,
      84            0 :         }
      85            0 :     }
      86              : 
      87              :     /// Make sure fields of the config are consistent.
      88            0 :     pub fn validate(&self) -> anyhow::Result<()> {
      89            0 :         // Single field validity
      90            0 :         anyhow::ensure!(
      91            0 :             0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
      92            0 :             "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
      93              :             self.resource_multiplier
      94              :         );
      95            0 :         anyhow::ensure!(
      96            0 :             self.spread_factor >= 0.0,
      97            0 :             "spread_factor must be >= 0, got {}",
      98              :             self.spread_factor
      99              :         );
     100              : 
     101              :         // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
     102              :         //
     103              :         // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
     104              :         // `spread_factor`, respectively. They are:
     105              :         //
     106              :         //                 `total`           `min_remaining_after_cache`
     107              :         //   size = ————————————————————— - —————————————————————————————
     108              :         //           `spread_factor` + 1         `spread_factor` + 1
     109              :         //
     110              :         // and
     111              :         //
     112              :         //   size = `resource_multiplier` × total
     113              :         //
     114              :         // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
     115              :         // form, with y = "size" and x = "total".
     116              :         //
     117              :         // These lines intersect at:
     118              :         //
     119              :         //               `min_remaining_after_cache`
     120              :         //   ———————————————————————————————————————————————————
     121              :         //    1 - `resource_multiplier` × (`spread_factor` + 1)
     122              :         //
     123              :         // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
     124              :         // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
     125              :         // (We also need it to be >= 0, but that's already guaranteed.)
     126              : 
     127            0 :         let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
     128            0 :         anyhow::ensure!(
     129            0 :             intersect_factor < 1.0,
     130            0 :             "incompatible resource_multipler and spread_factor"
     131              :         );
     132            0 :         Ok(())
     133            0 :     }
     134              : 
     135              :     /// Calculate the desired size of the cache, given the total memory
     136            0 :     pub fn calculate_cache_size(&self, total: u64) -> u64 {
     137            0 :         // *Note*: all units are in bytes, until the very last line.
     138            0 :         let available = total.saturating_sub(self.min_remaining_after_cache.get());
     139            0 :         if available == 0 {
     140            0 :             return 0;
     141            0 :         }
     142            0 : 
     143            0 :         // Conversions to ensure we don't overflow from floating-point ops
     144            0 :         let size_from_spread =
     145            0 :             i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
     146            0 : 
     147            0 :         let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
     148            0 : 
     149            0 :         let byte_size = u64::min(size_from_spread, size_from_normal);
     150            0 : 
     151            0 :         // The file cache operates in units of mebibytes, so the sizes we produce should
     152            0 :         // be rounded to a mebibyte. We round down to be conservative.
     153            0 :         byte_size / MiB * MiB
     154            0 :     }
     155              : }
     156              : 
     157              : impl FileCacheState {
     158              :     /// Connect to the file cache.
     159            0 :     #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
     160              :     pub async fn new(
     161              :         conn_str: &str,
     162              :         config: FileCacheConfig,
     163              :         token: CancellationToken,
     164              :     ) -> anyhow::Result<Self> {
     165              :         config.validate().context("file cache config is invalid")?;
     166              : 
     167            0 :         info!(conn_str, "connecting to Postgres file cache");
     168              :         let client = FileCacheState::connect(conn_str, token.clone())
     169              :             .await
     170              :             .context("failed to connect to postgres file cache")?;
     171              : 
     172              :         let conn_str = conn_str.to_string();
     173              :         Ok(Self {
     174              :             client,
     175              :             config,
     176              :             conn_str,
     177              :             token,
     178              :         })
     179              :     }
     180              : 
     181              :     /// Connect to Postgres.
     182              :     ///
     183              :     /// Aborts the spawned thread if the kill signal is received. This is not
     184              :     /// a method as it is called in [`FileCacheState::new`].
     185            0 :     #[tracing::instrument(skip_all, fields(%conn_str))]
     186              :     async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
     187              :         let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
     188              :             .await
     189              :             .context("failed to connect to pg client")?;
     190              : 
     191              :         // The connection object performs the actual communication with the database,
     192              :         // so spawn it off to run on its own. See tokio-postgres docs.
     193              :         crate::spawn_with_cancel(
     194              :             token,
     195              :             |res| {
     196            0 :                 if let Err(error) = res {
     197            0 :                     error!(%error, "postgres error")
     198            0 :                 }
     199            0 :             },
     200              :             conn,
     201              :         );
     202              : 
     203              :         Ok(client)
     204              :     }
     205              : 
     206              :     /// Execute a query with a retry if necessary.
     207              :     ///
     208              :     /// If the initial query fails, we restart the database connection and attempt
     209              :     /// if again.
     210            0 :     #[tracing::instrument(skip_all, fields(%statement))]
     211              :     pub async fn query_with_retry(
     212              :         &mut self,
     213              :         statement: &str,
     214              :         params: &[&(dyn ToSql + Sync)],
     215              :     ) -> anyhow::Result<Vec<Row>> {
     216              :         match self
     217              :             .client
     218              :             .query(statement, params)
     219              :             .await
     220              :             .context("failed to execute query")
     221              :         {
     222              :             Ok(rows) => Ok(rows),
     223              :             Err(e) => {
     224            0 :                 error!(error = ?e, "postgres error: {e} -> retrying");
     225              : 
     226              :                 let client = FileCacheState::connect(&self.conn_str, self.token.clone())
     227              :                     .await
     228              :                     .context("failed to connect to postgres file cache")?;
     229            0 :                 info!("successfully reconnected to postgres client");
     230              : 
     231              :                 // Replace the old client and attempt the query with the new one
     232              :                 self.client = client;
     233              :                 self.client
     234              :                     .query(statement, params)
     235              :                     .await
     236              :                     .context("failed to execute query a second time")
     237              :             }
     238              :         }
     239              :     }
     240              : 
     241              :     /// Get the current size of the file cache.
     242            0 :     #[tracing::instrument(skip_all)]
     243              :     pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
     244              :         self.query_with_retry(
     245              :             // The file cache GUC variable is in MiB, but the conversion with
     246              :             // pg_size_bytes means that the end result we get is in bytes.
     247              :             "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
     248              :             &[],
     249              :         )
     250              :         .await
     251              :         .context("failed to query pg for file cache size")?
     252              :         .first()
     253            0 :         .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
     254              :         // pg_size_bytes returns a bigint which is the same as an i64.
     255              :         .try_get::<_, i64>(0)
     256              :         // Since the size of the table is not negative, the cast is sound.
     257            0 :         .map(|bytes| bytes as u64)
     258              :         .context("failed to extract file cache size from query result")
     259              :     }
     260              : 
     261              :     /// Attempt to set the file cache size, returning the size it was actually
     262              :     /// set to.
     263            0 :     #[tracing::instrument(skip_all, fields(%num_bytes))]
     264              :     pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
     265              :         let max_bytes = self
     266              :             // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
     267              :             // means that the end result we get is in bytes.
     268              :             .query_with_retry(
     269              :                 "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
     270              :                 &[],
     271              :             )
     272              :             .await
     273              :             .context("failed to query pg for max file cache size")?
     274              :             .first()
     275            0 :             .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
     276              :             .try_get::<_, i64>(0)
     277            0 :             .map(|bytes| bytes as u64)
     278              :             .context("failed to extract max file cache size from query result")?;
     279              : 
     280              :         let max_mb = max_bytes / MiB;
     281              :         let num_mb = u64::min(num_bytes, max_bytes) / MiB;
     282              : 
     283              :         let capped = if num_bytes > max_bytes {
     284              :             " (capped by maximum size)"
     285              :         } else {
     286              :             ""
     287              :         };
     288              : 
     289            0 :         info!(
     290            0 :             size = num_mb,
     291            0 :             max = max_mb,
     292            0 :             "updating file cache size {capped}",
     293            0 :         );
     294              : 
     295              :         // note: even though the normal ways to get the cache size produce values with trailing "MB"
     296              :         // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
     297              :         // it expects to set the value is "integer number of MB" without trailing units.
     298              :         // For some reason, this *really* wasn't working with normal arguments, so that's
     299              :         // why we're constructing the query here.
     300              :         self.client
     301              :             .query(
     302              :                 &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
     303              :                 &[],
     304              :             )
     305              :             .await
     306              :             .context("failed to change file cache size limit")?;
     307              : 
     308              :         // must use pg_reload_conf to have the settings change take effect
     309              :         self.client
     310              :             .execute("SELECT pg_reload_conf();", &[])
     311              :             .await
     312              :             .context("failed to reload config")?;
     313              : 
     314              :         Ok(num_mb * MiB)
     315              :     }
     316              : }
        

Generated by: LCOV version 2.1-beta