       1              : //! Logic for configuring and scaling the Postgres file cache.
       2              : 
       3              : use std::num::NonZeroU64;
       4              : 
       5              : use anyhow::{Context, anyhow};
       6              : use tokio_postgres::types::ToSql;
       7              : use tokio_postgres::{Client, NoTls, Row};
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::{error, info};
      10              : 
      11              : use crate::MiB;
      12              : 
      13              : /// Manages Postgres' file cache by keeping a connection open.
      14              : #[derive(Debug)]
      15              : pub struct FileCacheState {
      16              :     client: Client,
      17              :     conn_str: String,
      18              :     pub(crate) config: FileCacheConfig,
      19              : 
      20              :     /// A token for cancelling spawned threads during shutdown.
      21              :     token: CancellationToken,
      22              : }
      23              : 
      24              : #[derive(Debug)]
      25              : pub struct FileCacheConfig {
      26              :     /// The size of the file cache, in terms of the size of the resource it consumes
      27              :     /// (currently: only memory)
      28              :     ///
      29              :     /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
      30              :     /// resources.
      31              :     ///
      32              :     /// This value must be strictly between 0 and 1.
      33              :     resource_multiplier: f64,
      34              : 
      35              :     /// The required minimum amount of memory, in bytes, that must remain available
      36              :     /// after subtracting the file cache.
      37              :     ///
      38              :     /// This value must be non-zero.
      39              :     min_remaining_after_cache: NonZeroU64,
      40              : 
      41              :     /// Controls the rate of increase in the file cache's size as it grows from zero
      42              :     /// (when total resources equals min_remaining_after_cache) to the desired size based on
      43              :     /// `resource_multiplier`.
      44              :     ///
      45              :     /// A `spread_factor` of zero means that all additional resources will go to the cache until it
      46              :     /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
      47              :     /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
      48              :     /// its desired size".
      49              :     ///
      50              :     /// This value must be >= 0, and must retain an increase that is more than what would be given by
      51              :     /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
      52              :     /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
      53              :     /// as desired by `resource_multiplier`.
      54              :     ///
      55              :     /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
      56              :     spread_factor: f64,
      57              : }
      58              : 
      59              : impl Default for FileCacheConfig {
      60            0 :     fn default() -> Self {
      61            0 :         Self {
      62            0 :             resource_multiplier: 0.75,
      63            0 :             // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
      64            0 :             // memory, the kernel will just evict from its page cache, rather than e.g. killing
      65            0 :             // everything.
      66            0 :             min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
      67            0 :             spread_factor: 0.1,
      68            0 :         }
      69            0 :     }
      70              : }
      71              : 
      72              : impl FileCacheConfig {
      73              :     /// Make sure fields of the config are consistent.
      74            0 :     pub fn validate(&self) -> anyhow::Result<()> {
      75              :         // Single field validity
      76            0 :         anyhow::ensure!(
      77            0 :             0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
      78            0 :             "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
      79              :             self.resource_multiplier
      80              :         );
      81            0 :         anyhow::ensure!(
      82            0 :             self.spread_factor >= 0.0,
      83            0 :             "spread_factor must be >= 0, got {}",
      84              :             self.spread_factor
      85              :         );
      86              : 
      87              :         // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
      88              :         //
      89              :         // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
      90              :         // `spread_factor`, respectively. They are:
      91              :         //
      92              :         //                 `total`           `min_remaining_after_cache`
      93              :         //   size = ————————————————————— - —————————————————————————————
      94              :         //           `spread_factor` + 1         `spread_factor` + 1
      95              :         //
      96              :         // and
      97              :         //
      98              :         //   size = `resource_multiplier` × total
      99              :         //
     100              :         // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
     101              :         // form, with y = "size" and x = "total".
     102              :         //
     103              :         // These lines intersect at:
     104              :         //
     105              :         //               `min_remaining_after_cache`
     106              :         //   ———————————————————————————————————————————————————
     107              :         //    1 - `resource_multiplier` × (`spread_factor` + 1)
     108              :         //
     109              :         // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
     110              :         // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
     111              :         // (We also need it to be >= 0, but that's already guaranteed.)
     112              : 
     113            0 :         let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
     114            0 :         anyhow::ensure!(
     115            0 :             intersect_factor < 1.0,
     116            0 :             "incompatible resource_multipler and spread_factor"
     117              :         );
     118            0 :         Ok(())
     119            0 :     }
     120              : 
     121              :     /// Calculate the desired size of the cache, given the total memory
     122            0 :     pub fn calculate_cache_size(&self, total: u64) -> u64 {
     123            0 :         // *Note*: all units are in bytes, until the very last line.
     124            0 :         let available = total.saturating_sub(self.min_remaining_after_cache.get());
     125            0 :         if available == 0 {
     126            0 :             return 0;
     127            0 :         }
     128            0 : 
     129            0 :         // Conversions to ensure we don't overflow from floating-point ops
     130            0 :         let size_from_spread =
     131            0 :             i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
     132            0 : 
     133            0 :         let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
     134            0 : 
     135            0 :         let byte_size = u64::min(size_from_spread, size_from_normal);
     136            0 : 
     137            0 :         // The file cache operates in units of mebibytes, so the sizes we produce should
     138            0 :         // be rounded to a mebibyte. We round down to be conservative.
     139            0 :         byte_size / MiB * MiB
     140            0 :     }
     141              : }
     142              : 
     143              : impl FileCacheState {
     144              :     /// Connect to the file cache.
     145              :     #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
     146              :     pub async fn new(
     147              :         conn_str: &str,
     148              :         config: FileCacheConfig,
     149              :         token: CancellationToken,
     150              :     ) -> anyhow::Result<Self> {
     151              :         config.validate().context("file cache config is invalid")?;
     152              : 
     153              :         info!(conn_str, "connecting to Postgres file cache");
     154              :         let client = FileCacheState::connect(conn_str, token.clone())
     155              :             .await
     156              :             .context("failed to connect to postgres file cache")?;
     157              : 
     158              :         let conn_str = conn_str.to_string();
     159              :         Ok(Self {
     160              :             client,
     161              :             config,
     162              :             conn_str,
     163              :             token,
     164              :         })
     165              :     }
     166              : 
     167              :     /// Connect to Postgres.
     168              :     ///
     169              :     /// Aborts the spawned thread if the kill signal is received. This is not
     170              :     /// a method as it is called in [`FileCacheState::new`].
     171              :     #[tracing::instrument(skip_all, fields(%conn_str))]
     172              :     async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
     173              :         let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
     174              :             .await
     175              :             .context("failed to connect to pg client")?;
     176              : 
     177              :         // The connection object performs the actual communication with the database,
     178              :         // so spawn it off to run on its own. See tokio-postgres docs.
     179              :         crate::spawn_with_cancel(
     180              :             token,
     181            0 :             |res| {
     182            0 :                 if let Err(e) = res {
     183            0 :                     error!(error = format_args!("{e:#}"), "postgres error");
     184            0 :                 }
     185            0 :             },
     186              :             conn,
     187              :         );
     188              : 
     189              :         Ok(client)
     190              :     }
     191              : 
     192              :     /// Execute a query with a retry if necessary.
     193              :     ///
     194              :     /// If the initial query fails, we restart the database connection and attempt
     195              :     /// if again.
     196              :     #[tracing::instrument(skip_all, fields(%statement))]
     197              :     pub async fn query_with_retry(
     198              :         &mut self,
     199              :         statement: &str,
     200              :         params: &[&(dyn ToSql + Sync)],
     201              :     ) -> anyhow::Result<Vec<Row>> {
     202              :         match self
     203              :             .client
     204              :             .query(statement, params)
     205              :             .await
     206              :             .context("failed to execute query")
     207              :         {
     208              :             Ok(rows) => Ok(rows),
     209              :             Err(e) => {
     210              :                 error!(error = format_args!("{e:#}"), "postgres error -> retrying");
     211              : 
     212              :                 let client = FileCacheState::connect(&self.conn_str, self.token.clone())
     213              :                     .await
     214              :                     .context("failed to connect to postgres file cache")?;
     215              :                 info!("successfully reconnected to postgres client");
     216              : 
     217              :                 // Replace the old client and attempt the query with the new one
     218              :                 self.client = client;
     219              :                 self.client
     220              :                     .query(statement, params)
     221              :                     .await
     222              :                     .context("failed to execute query a second time")
     223              :             }
     224              :         }
     225              :     }
     226              : 
     227              :     /// Get the current size of the file cache.
     228              :     #[tracing::instrument(skip_all)]
     229              :     pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
     230              :         self.query_with_retry(
     231              :             // The file cache GUC variable is in MiB, but the conversion with
     232              :             // pg_size_bytes means that the end result we get is in bytes.
     233              :             "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
     234              :             &[],
     235              :         )
     236              :         .await
     237              :         .context("failed to query pg for file cache size")?
     238              :         .first()
     239            0 :         .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
     240              :         // pg_size_bytes returns a bigint which is the same as an i64.
     241              :         .try_get::<_, i64>(0)
     242              :         // Since the size of the table is not negative, the cast is sound.
     243            0 :         .map(|bytes| bytes as u64)
     244              :         .context("failed to extract file cache size from query result")
     245              :     }
     246              : 
     247              :     /// Attempt to set the file cache size, returning the size it was actually
     248              :     /// set to.
     249              :     #[tracing::instrument(skip_all, fields(%num_bytes))]
     250              :     pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
     251              :         let max_bytes = self
     252              :             // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
     253              :             // means that the end result we get is in bytes.
     254              :             .query_with_retry(
     255              :                 "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
     256              :                 &[],
     257              :             )
     258              :             .await
     259              :             .context("failed to query pg for max file cache size")?
     260              :             .first()
     261            0 :             .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
     262              :             .try_get::<_, i64>(0)
     263            0 :             .map(|bytes| bytes as u64)
     264              :             .context("failed to extract max file cache size from query result")?;
     265              : 
     266              :         let max_mb = max_bytes / MiB;
     267              :         let num_mb = u64::min(num_bytes, max_bytes) / MiB;
     268              : 
     269              :         let capped = if num_bytes > max_bytes {
     270              :             " (capped by maximum size)"
     271              :         } else {
     272              :             ""
     273              :         };
     274              : 
     275              :         info!(
     276              :             size = num_mb,
     277              :             max = max_mb,
     278              :             "updating file cache size {capped}",
     279              :         );
     280              : 
     281              :         // note: even though the normal ways to get the cache size produce values with trailing "MB"
     282              :         // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
     283              :         // it expects to set the value is "integer number of MB" without trailing units.
     284              :         // For some reason, this *really* wasn't working with normal arguments, so that's
     285              :         // why we're constructing the query here.
     286              :         self.client
     287              :             .query(
     288              :                 &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
     289              :                 &[],
     290              :             )
     291              :             .await
     292              :             .context("failed to change file cache size limit")?;
     293              : 
     294              :         // must use pg_reload_conf to have the settings change take effect
     295              :         self.client
     296              :             .execute("SELECT pg_reload_conf();", &[])
     297              :             .await
     298              :             .context("failed to reload config")?;
     299              : 
     300              :         Ok(num_mb * MiB)
     301              :     }
     302              : }

