LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - filecache.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 78 0 78
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 31 0 31
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Logic for configuring and scaling the Postgres file cache.
       2                 : 
       3                 : use std::num::NonZeroU64;
       4                 : 
       5                 : use crate::MiB;
       6                 : use anyhow::{anyhow, Context};
       7                 : use tokio_postgres::{types::ToSql, Client, NoTls, Row};
       8                 : use tokio_util::sync::CancellationToken;
       9                 : use tracing::{error, info};
      10                 : 
      11                 : /// Manages Postgres' file cache by keeping a connection open.
      12 UBC           0 : #[derive(Debug)]
      13                 : pub struct FileCacheState {
      14                 :     client: Client,
      15                 :     conn_str: String,
      16                 :     pub(crate) config: FileCacheConfig,
      17                 : 
      18                 :     /// A token for cancelling spawned threads during shutdown.
      19                 :     token: CancellationToken,
      20                 : }
      21                 : 
      22               0 : #[derive(Debug)]
      23                 : pub struct FileCacheConfig {
      24                 :     /// Whether the file cache is *actually* stored in memory (e.g. by writing to
      25                 :     /// a tmpfs or shmem file). If true, the size of the file cache will be counted against the
      26                 :     /// memory available for the cgroup.
      27                 :     pub(crate) in_memory: bool,
      28                 : 
      29                 :     /// The size of the file cache, in terms of the size of the resource it consumes
      30                 :     /// (currently: only memory)
      31                 :     ///
      32                 :     /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
      33                 :     /// resources.
      34                 :     ///
      35                 :     /// This value must be strictly between 0 and 1.
      36                 :     resource_multiplier: f64,
      37                 : 
      38                 :     /// The required minimum amount of memory, in bytes, that must remain available
      39                 :     /// after subtracting the file cache.
      40                 :     ///
      41                 :     /// This value must be non-zero.
      42                 :     min_remaining_after_cache: NonZeroU64,
      43                 : 
      44                 :     /// Controls the rate of increase in the file cache's size as it grows from zero
      45                 :     /// (when total resources equals min_remaining_after_cache) to the desired size based on
      46                 :     /// `resource_multiplier`.
      47                 :     ///
      48                 :     /// A `spread_factor` of zero means that all additional resources will go to the cache until it
      49                 :     /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
      50                 :     /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
      51                 :     /// its desired size".
      52                 :     ///
      53                 :     /// This value must be >= 0, and must retain an increase that is more than what would be given by
      54                 :     /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
      55                 :     /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
      56                 :     /// as desired by `resource_multiplier`.
      57                 :     ///
      58                 :     /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
      59                 :     spread_factor: f64,
      60                 : }
      61                 : 
      62                 : impl FileCacheConfig {
      63               0 :     pub fn default_in_memory() -> Self {
      64               0 :         Self {
      65               0 :             in_memory: true,
      66               0 :             // 75 %
      67               0 :             resource_multiplier: 0.75,
      68               0 :             // 640 MiB; (512 + 128)
      69               0 :             min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(),
      70               0 :             // ensure any increase in file cache size is split 90-10 with 10% to other memory
      71               0 :             spread_factor: 0.1,
      72               0 :         }
      73               0 :     }
      74                 : 
      75               0 :     pub fn default_on_disk() -> Self {
      76               0 :         Self {
      77               0 :             in_memory: false,
      78               0 :             resource_multiplier: 0.75,
      79               0 :             // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
      80               0 :             // memory, the kernel will just evict from its page cache, rather than e.g. killing
      81               0 :             // everything.
      82               0 :             min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
      83               0 :             spread_factor: 0.1,
      84               0 :         }
      85               0 :     }
      86                 : 
      87                 :     /// Make sure fields of the config are consistent.
      88               0 :     pub fn validate(&self) -> anyhow::Result<()> {
      89               0 :         // Single field validity
      90               0 :         anyhow::ensure!(
      91               0 :             0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
      92               0 :             "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
      93                 :             self.resource_multiplier
      94                 :         );
      95               0 :         anyhow::ensure!(
      96               0 :             self.spread_factor >= 0.0,
      97               0 :             "spread_factor must be >= 0, got {}",
      98                 :             self.spread_factor
      99                 :         );
     100                 : 
     101                 :         // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
     102                 :         //
     103                 :         // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
     104                 :         // `spread_factor`, respectively. They are:
     105                 :         //
     106                 :         //                 `total`           `min_remaining_after_cache`
     107                 :         //   size = ————————————————————— - —————————————————————————————
     108                 :         //           `spread_factor` + 1         `spread_factor` + 1
     109                 :         //
     110                 :         // and
     111                 :         //
     112                 :         //   size = `resource_multiplier` × total
     113                 :         //
     114                 :         // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
     115                 :         // form, with y = "size" and x = "total".
     116                 :         //
     117                 :         // These lines intersect at:
     118                 :         //
     119                 :         //               `min_remaining_after_cache`
     120                 :         //   ———————————————————————————————————————————————————
     121                 :         //    1 - `resource_multiplier` × (`spread_factor` + 1)
     122                 :         //
     123                 :         // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
     124                 :         // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
     125                 :         // (We also need it to be >= 0, but that's already guaranteed.)
     126                 : 
     127               0 :         let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
     128               0 :         anyhow::ensure!(
     129               0 :             intersect_factor < 1.0,
     130               0 :             "incompatible resource_multipler and spread_factor"
     131                 :         );
     132               0 :         Ok(())
     133               0 :     }
     134                 : 
     135                 :     /// Calculate the desired size of the cache, given the total memory
     136               0 :     pub fn calculate_cache_size(&self, total: u64) -> u64 {
     137               0 :         // *Note*: all units are in bytes, until the very last line.
     138               0 :         let available = total.saturating_sub(self.min_remaining_after_cache.get());
     139               0 :         if available == 0 {
     140               0 :             return 0;
     141               0 :         }
     142               0 : 
     143               0 :         // Conversions to ensure we don't overflow from floating-point ops
     144               0 :         let size_from_spread =
     145               0 :             i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
     146               0 : 
     147               0 :         let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
     148               0 : 
     149               0 :         let byte_size = u64::min(size_from_spread, size_from_normal);
     150               0 : 
     151               0 :         // The file cache operates in units of mebibytes, so the sizes we produce should
     152               0 :         // be rounded to a mebibyte. We round down to be conservative.
     153               0 :         byte_size / MiB * MiB
     154               0 :     }
     155                 : }
     156                 : 
     157                 : impl FileCacheState {
     158                 :     /// Connect to the file cache.
     159               0 :     #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
     160                 :     pub async fn new(
     161                 :         conn_str: &str,
     162                 :         config: FileCacheConfig,
     163                 :         token: CancellationToken,
     164                 :     ) -> anyhow::Result<Self> {
     165                 :         config.validate().context("file cache config is invalid")?;
     166                 : 
     167               0 :         info!(conn_str, "connecting to Postgres file cache");
     168                 :         let client = FileCacheState::connect(conn_str, token.clone())
     169                 :             .await
     170                 :             .context("failed to connect to postgres file cache")?;
     171                 : 
     172                 :         let conn_str = conn_str.to_string();
     173                 :         Ok(Self {
     174                 :             client,
     175                 :             config,
     176                 :             conn_str,
     177                 :             token,
     178                 :         })
     179                 :     }
     180                 : 
     181                 :     /// Connect to Postgres.
     182                 :     ///
     183                 :     /// Aborts the spawned thread if the kill signal is received. This is not
     184                 :     /// a method as it is called in [`FileCacheState::new`].
     185               0 :     #[tracing::instrument(skip_all, fields(%conn_str))]
     186                 :     async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
     187                 :         let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
     188                 :             .await
     189                 :             .context("failed to connect to pg client")?;
     190                 : 
     191                 :         // The connection object performs the actual communication with the database,
     192                 :         // so spawn it off to run on its own. See tokio-postgres docs.
     193                 :         crate::spawn_with_cancel(
     194                 :             token,
     195                 :             |res| {
     196               0 :                 if let Err(error) = res {
     197               0 :                     error!(%error, "postgres error")
     198               0 :                 }
     199               0 :             },
     200                 :             conn,
     201                 :         );
     202                 : 
     203                 :         Ok(client)
     204                 :     }
     205                 : 
     206                 :     /// Execute a query with a retry if necessary.
     207                 :     ///
     208                 :     /// If the initial query fails, we restart the database connection and attempt
     209                 :     /// if again.
     210               0 :     #[tracing::instrument(skip_all, fields(%statement))]
     211                 :     pub async fn query_with_retry(
     212                 :         &mut self,
     213                 :         statement: &str,
     214                 :         params: &[&(dyn ToSql + Sync)],
     215                 :     ) -> anyhow::Result<Vec<Row>> {
     216                 :         match self
     217                 :             .client
     218                 :             .query(statement, params)
     219                 :             .await
     220                 :             .context("failed to execute query")
     221                 :         {
     222                 :             Ok(rows) => Ok(rows),
     223                 :             Err(e) => {
     224               0 :                 error!(error = ?e, "postgres error: {e} -> retrying");
     225                 : 
     226                 :                 let client = FileCacheState::connect(&self.conn_str, self.token.clone())
     227                 :                     .await
     228                 :                     .context("failed to connect to postgres file cache")?;
     229               0 :                 info!("successfully reconnected to postgres client");
     230                 : 
     231                 :                 // Replace the old client and attempt the query with the new one
     232                 :                 self.client = client;
     233                 :                 self.client
     234                 :                     .query(statement, params)
     235                 :                     .await
     236                 :                     .context("failed to execute query a second time")
     237                 :             }
     238                 :         }
     239                 :     }
     240                 : 
     241                 :     /// Get the current size of the file cache.
     242               0 :     #[tracing::instrument(skip_all)]
     243                 :     pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
     244                 :         self.query_with_retry(
     245                 :             // The file cache GUC variable is in MiB, but the conversion with
     246                 :             // pg_size_bytes means that the end result we get is in bytes.
     247                 :             "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
     248                 :             &[],
     249                 :         )
     250                 :         .await
     251                 :         .context("failed to query pg for file cache size")?
     252                 :         .first()
     253               0 :         .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
     254                 :         // pg_size_bytes returns a bigint which is the same as an i64.
     255                 :         .try_get::<_, i64>(0)
     256                 :         // Since the size of the table is not negative, the cast is sound.
     257               0 :         .map(|bytes| bytes as u64)
     258                 :         .context("failed to extract file cache size from query result")
     259                 :     }
     260                 : 
     261                 :     /// Attempt to set the file cache size, returning the size it was actually
     262                 :     /// set to.
     263               0 :     #[tracing::instrument(skip_all, fields(%num_bytes))]
     264                 :     pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
     265                 :         let max_bytes = self
     266                 :             // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
     267                 :             // means that the end result we get is in bytes.
     268                 :             .query_with_retry(
     269                 :                 "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
     270                 :                 &[],
     271                 :             )
     272                 :             .await
     273                 :             .context("failed to query pg for max file cache size")?
     274                 :             .first()
     275               0 :             .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
     276                 :             .try_get::<_, i64>(0)
     277               0 :             .map(|bytes| bytes as u64)
     278                 :             .context("failed to extract max file cache size from query result")?;
     279                 : 
     280                 :         let max_mb = max_bytes / MiB;
     281                 :         let num_mb = u64::min(num_bytes, max_bytes) / MiB;
     282                 : 
     283                 :         let capped = if num_bytes > max_bytes {
     284                 :             " (capped by maximum size)"
     285                 :         } else {
     286                 :             ""
     287                 :         };
     288                 : 
     289               0 :         info!(
     290               0 :             size = num_mb,
     291               0 :             max = max_mb,
     292               0 :             "updating file cache size {capped}",
     293               0 :         );
     294                 : 
     295                 :         // note: even though the normal ways to get the cache size produce values with trailing "MB"
     296                 :         // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
     297                 :         // it expects to set the value is "integer number of MB" without trailing units.
     298                 :         // For some reason, this *really* wasn't working with normal arguments, so that's
     299                 :         // why we're constructing the query here.
     300                 :         self.client
     301                 :             .query(
     302                 :                 &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
     303                 :                 &[],
     304                 :             )
     305                 :             .await
     306                 :             .context("failed to change file cache size limit")?;
     307                 : 
     308                 :         // must use pg_reload_conf to have the settings change take effect
     309                 :         self.client
     310                 :             .execute("SELECT pg_reload_conf();", &[])
     311                 :             .await
     312                 :             .context("failed to reload config")?;
     313                 : 
     314                 :         Ok(num_mb * MiB)
     315                 :     }
     316                 : }
        

Generated by: LCOV version 2.1-beta