LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - filecache.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 0.0 % 57 0
Test Date: 2024-11-25 17:48:16 Functions: 0.0 % 13 0

            Line data    Source code
       1              : //! Logic for configuring and scaling the Postgres file cache.
       2              : 
       3              : use std::num::NonZeroU64;
       4              : 
       5              : use crate::MiB;
       6              : use anyhow::{anyhow, Context};
       7              : use tokio_postgres::{types::ToSql, Client, NoTls, Row};
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::{error, info};
      10              : 
      11              : /// Manages Postgres' file cache by keeping a connection open.
      12              : #[derive(Debug)]
      13              : pub struct FileCacheState {
      14              :     client: Client,
      15              :     conn_str: String,
      16              :     pub(crate) config: FileCacheConfig,
      17              : 
      18              :     /// A token for cancelling spawned threads during shutdown.
      19              :     token: CancellationToken,
      20              : }
      21              : 
      22              : #[derive(Debug)]
      23              : pub struct FileCacheConfig {
      24              :     /// The size of the file cache, in terms of the size of the resource it consumes
      25              :     /// (currently: only memory)
      26              :     ///
      27              :     /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
      28              :     /// resources.
      29              :     ///
      30              :     /// This value must be strictly between 0 and 1.
      31              :     resource_multiplier: f64,
      32              : 
      33              :     /// The required minimum amount of memory, in bytes, that must remain available
      34              :     /// after subtracting the file cache.
      35              :     ///
      36              :     /// This value must be non-zero.
      37              :     min_remaining_after_cache: NonZeroU64,
      38              : 
      39              :     /// Controls the rate of increase in the file cache's size as it grows from zero
      40              :     /// (when total resources equals min_remaining_after_cache) to the desired size based on
      41              :     /// `resource_multiplier`.
      42              :     ///
      43              :     /// A `spread_factor` of zero means that all additional resources will go to the cache until it
      44              :     /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
      45              :     /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
      46              :     /// its desired size".
      47              :     ///
      48              :     /// This value must be >= 0, and must retain an increase that is more than what would be given by
      49              :     /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
      50              :     /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
      51              :     /// as desired by `resource_multiplier`.
      52              :     ///
      53              :     /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
      54              :     spread_factor: f64,
      55              : }
      56              : 
      57              : impl Default for FileCacheConfig {
      58            0 :     fn default() -> Self {
      59            0 :         Self {
      60            0 :             resource_multiplier: 0.75,
      61            0 :             // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
      62            0 :             // memory, the kernel will just evict from its page cache, rather than e.g. killing
      63            0 :             // everything.
      64            0 :             min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
      65            0 :             spread_factor: 0.1,
      66            0 :         }
      67            0 :     }
      68              : }
      69              : 
      70              : impl FileCacheConfig {
      71              :     /// Make sure fields of the config are consistent.
      72            0 :     pub fn validate(&self) -> anyhow::Result<()> {
      73            0 :         // Single field validity
      74            0 :         anyhow::ensure!(
      75            0 :             0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
      76            0 :             "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
      77              :             self.resource_multiplier
      78              :         );
      79            0 :         anyhow::ensure!(
      80            0 :             self.spread_factor >= 0.0,
      81            0 :             "spread_factor must be >= 0, got {}",
      82              :             self.spread_factor
      83              :         );
      84              : 
      85              :         // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
      86              :         //
      87              :         // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
      88              :         // `spread_factor`, respectively. They are:
      89              :         //
      90              :         //                 `total`           `min_remaining_after_cache`
      91              :         //   size = ————————————————————— - —————————————————————————————
      92              :         //           `spread_factor` + 1         `spread_factor` + 1
      93              :         //
      94              :         // and
      95              :         //
      96              :         //   size = `resource_multiplier` × total
      97              :         //
      98              :         // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
      99              :         // form, with y = "size" and x = "total".
     100              :         //
     101              :         // These lines intersect at:
     102              :         //
     103              :         //               `min_remaining_after_cache`
     104              :         //   ———————————————————————————————————————————————————
     105              :         //    1 - `resource_multiplier` × (`spread_factor` + 1)
     106              :         //
     107              :         // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
     108              :         // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
     109              :         // (We also need it to be >= 0, but that's already guaranteed.)
     110              : 
     111            0 :         let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
     112            0 :         anyhow::ensure!(
     113            0 :             intersect_factor < 1.0,
     114            0 :             "incompatible resource_multipler and spread_factor"
     115              :         );
     116            0 :         Ok(())
     117            0 :     }
     118              : 
     119              :     /// Calculate the desired size of the cache, given the total memory
     120            0 :     pub fn calculate_cache_size(&self, total: u64) -> u64 {
     121            0 :         // *Note*: all units are in bytes, until the very last line.
     122            0 :         let available = total.saturating_sub(self.min_remaining_after_cache.get());
     123            0 :         if available == 0 {
     124            0 :             return 0;
     125            0 :         }
     126            0 : 
     127            0 :         // Conversions to ensure we don't overflow from floating-point ops
     128            0 :         let size_from_spread =
     129            0 :             i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
     130            0 : 
     131            0 :         let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
     132            0 : 
     133            0 :         let byte_size = u64::min(size_from_spread, size_from_normal);
     134            0 : 
     135            0 :         // The file cache operates in units of mebibytes, so the sizes we produce should
     136            0 :         // be rounded to a mebibyte. We round down to be conservative.
     137            0 :         byte_size / MiB * MiB
     138            0 :     }
     139              : }
     140              : 
     141              : impl FileCacheState {
     142              :     /// Connect to the file cache.
     143            0 :     #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
     144              :     pub async fn new(
     145              :         conn_str: &str,
     146              :         config: FileCacheConfig,
     147              :         token: CancellationToken,
     148              :     ) -> anyhow::Result<Self> {
     149              :         config.validate().context("file cache config is invalid")?;
     150              : 
     151              :         info!(conn_str, "connecting to Postgres file cache");
     152              :         let client = FileCacheState::connect(conn_str, token.clone())
     153              :             .await
     154              :             .context("failed to connect to postgres file cache")?;
     155              : 
     156              :         let conn_str = conn_str.to_string();
     157              :         Ok(Self {
     158              :             client,
     159              :             config,
     160              :             conn_str,
     161              :             token,
     162              :         })
     163              :     }
     164              : 
     165              :     /// Connect to Postgres.
     166              :     ///
     167              :     /// Aborts the spawned thread if the kill signal is received. This is not
     168              :     /// a method as it is called in [`FileCacheState::new`].
     169            0 :     #[tracing::instrument(skip_all, fields(%conn_str))]
     170              :     async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
     171              :         let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
     172              :             .await
     173              :             .context("failed to connect to pg client")?;
     174              : 
     175              :         // The connection object performs the actual communication with the database,
     176              :         // so spawn it off to run on its own. See tokio-postgres docs.
     177              :         crate::spawn_with_cancel(
     178              :             token,
     179            0 :             |res| {
     180            0 :                 if let Err(error) = res {
     181            0 :                     error!(%error, "postgres error")
     182            0 :                 }
     183            0 :             },
     184              :             conn,
     185              :         );
     186              : 
     187              :         Ok(client)
     188              :     }
     189              : 
     190              :     /// Execute a query with a retry if necessary.
     191              :     ///
     192              :     /// If the initial query fails, we restart the database connection and attempt
     193              :     /// if again.
     194            0 :     #[tracing::instrument(skip_all, fields(%statement))]
     195              :     pub async fn query_with_retry(
     196              :         &mut self,
     197              :         statement: &str,
     198              :         params: &[&(dyn ToSql + Sync)],
     199              :     ) -> anyhow::Result<Vec<Row>> {
     200              :         match self
     201              :             .client
     202              :             .query(statement, params)
     203              :             .await
     204              :             .context("failed to execute query")
     205              :         {
     206              :             Ok(rows) => Ok(rows),
     207              :             Err(e) => {
     208              :                 error!(error = ?e, "postgres error: {e} -> retrying");
     209              : 
     210              :                 let client = FileCacheState::connect(&self.conn_str, self.token.clone())
     211              :                     .await
     212              :                     .context("failed to connect to postgres file cache")?;
     213              :                 info!("successfully reconnected to postgres client");
     214              : 
     215              :                 // Replace the old client and attempt the query with the new one
     216              :                 self.client = client;
     217              :                 self.client
     218              :                     .query(statement, params)
     219              :                     .await
     220              :                     .context("failed to execute query a second time")
     221              :             }
     222              :         }
     223              :     }
     224              : 
     225              :     /// Get the current size of the file cache.
     226            0 :     #[tracing::instrument(skip_all)]
     227              :     pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
     228              :         self.query_with_retry(
     229              :             // The file cache GUC variable is in MiB, but the conversion with
     230              :             // pg_size_bytes means that the end result we get is in bytes.
     231              :             "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
     232              :             &[],
     233              :         )
     234              :         .await
     235              :         .context("failed to query pg for file cache size")?
     236              :         .first()
     237            0 :         .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
     238              :         // pg_size_bytes returns a bigint which is the same as an i64.
     239              :         .try_get::<_, i64>(0)
     240              :         // Since the size of the table is not negative, the cast is sound.
     241            0 :         .map(|bytes| bytes as u64)
     242              :         .context("failed to extract file cache size from query result")
     243              :     }
     244              : 
     245              :     /// Attempt to set the file cache size, returning the size it was actually
     246              :     /// set to.
     247            0 :     #[tracing::instrument(skip_all, fields(%num_bytes))]
     248              :     pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
     249              :         let max_bytes = self
     250              :             // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
     251              :             // means that the end result we get is in bytes.
     252              :             .query_with_retry(
     253              :                 "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
     254              :                 &[],
     255              :             )
     256              :             .await
     257              :             .context("failed to query pg for max file cache size")?
     258              :             .first()
     259            0 :             .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
     260              :             .try_get::<_, i64>(0)
     261            0 :             .map(|bytes| bytes as u64)
     262              :             .context("failed to extract max file cache size from query result")?;
     263              : 
     264              :         let max_mb = max_bytes / MiB;
     265              :         let num_mb = u64::min(num_bytes, max_bytes) / MiB;
     266              : 
     267              :         let capped = if num_bytes > max_bytes {
     268              :             " (capped by maximum size)"
     269              :         } else {
     270              :             ""
     271              :         };
     272              : 
     273              :         info!(
     274              :             size = num_mb,
     275              :             max = max_mb,
     276              :             "updating file cache size {capped}",
     277              :         );
     278              : 
     279              :         // note: even though the normal ways to get the cache size produce values with trailing "MB"
     280              :         // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
     281              :         // it expects to set the value is "integer number of MB" without trailing units.
     282              :         // For some reason, this *really* wasn't working with normal arguments, so that's
     283              :         // why we're constructing the query here.
     284              :         self.client
     285              :             .query(
     286              :                 &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
     287              :                 &[],
     288              :             )
     289              :             .await
     290              :             .context("failed to change file cache size limit")?;
     291              : 
     292              :         // must use pg_reload_conf to have the settings change take effect
     293              :         self.client
     294              :             .execute("SELECT pg_reload_conf();", &[])
     295              :             .await
     296              :             .context("failed to reload config")?;
     297              : 
     298              :         Ok(num_mb * MiB)
     299              :     }
     300              : }
        

Generated by: LCOV version 2.1-beta