Line data Source code
1 : //! Logic for configuring and scaling the Postgres file cache.
2 :
3 : use std::num::NonZeroU64;
4 :
5 : use crate::MiB;
6 : use anyhow::{anyhow, Context};
7 : use tokio_postgres::{types::ToSql, Client, NoTls, Row};
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::{error, info};
10 :
11 : /// Manages Postgres' file cache by keeping a connection open.
12 : #[derive(Debug)]
13 : pub struct FileCacheState {
14 : client: Client,
15 : conn_str: String,
16 : pub(crate) config: FileCacheConfig,
17 :
18 : /// A token for cancelling spawned threads during shutdown.
19 : token: CancellationToken,
20 : }
21 :
22 : #[derive(Debug)]
23 : pub struct FileCacheConfig {
24 : /// The size of the file cache, in terms of the size of the resource it consumes
25 : /// (currently: only memory)
26 : ///
27 : /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
28 : /// resources.
29 : ///
30 : /// This value must be strictly between 0 and 1.
31 : resource_multiplier: f64,
32 :
33 : /// The required minimum amount of memory, in bytes, that must remain available
34 : /// after subtracting the file cache.
35 : ///
36 : /// This value must be non-zero.
37 : min_remaining_after_cache: NonZeroU64,
38 :
39 : /// Controls the rate of increase in the file cache's size as it grows from zero
40 : /// (when total resources equals min_remaining_after_cache) to the desired size based on
41 : /// `resource_multiplier`.
42 : ///
43 : /// A `spread_factor` of zero means that all additional resources will go to the cache until it
44 : /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
45 : /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
46 : /// its desired size".
47 : ///
48 : /// This value must be >= 0, and must retain an increase that is more than what would be given by
49 : /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
50 : /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
51 : /// as desired by `resource_multiplier`.
52 : ///
53 : /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
54 : spread_factor: f64,
55 : }
56 :
57 : impl Default for FileCacheConfig {
58 0 : fn default() -> Self {
59 0 : Self {
60 0 : resource_multiplier: 0.75,
61 0 : // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
62 0 : // memory, the kernel will just evict from its page cache, rather than e.g. killing
63 0 : // everything.
64 0 : min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
65 0 : spread_factor: 0.1,
66 0 : }
67 0 : }
68 : }
69 :
70 : impl FileCacheConfig {
71 : /// Make sure fields of the config are consistent.
72 0 : pub fn validate(&self) -> anyhow::Result<()> {
73 0 : // Single field validity
74 0 : anyhow::ensure!(
75 0 : 0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
76 0 : "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
77 : self.resource_multiplier
78 : );
79 0 : anyhow::ensure!(
80 0 : self.spread_factor >= 0.0,
81 0 : "spread_factor must be >= 0, got {}",
82 : self.spread_factor
83 : );
84 :
85 : // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
86 : //
87 : // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
88 : // `spread_factor`, respectively. They are:
89 : //
90 : // `total` `min_remaining_after_cache`
91 : // size = ————————————————————— - —————————————————————————————
92 : // `spread_factor` + 1 `spread_factor` + 1
93 : //
94 : // and
95 : //
96 : // size = `resource_multiplier` × total
97 : //
98 : // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
99 : // form, with y = "size" and x = "total".
100 : //
101 : // These lines intersect at:
102 : //
103 : // `min_remaining_after_cache`
104 : // ———————————————————————————————————————————————————
105 : // 1 - `resource_multiplier` × (`spread_factor` + 1)
106 : //
107 : // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
108 : // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
109 : // (We also need it to be >= 0, but that's already guaranteed.)
110 :
111 0 : let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
112 0 : anyhow::ensure!(
113 0 : intersect_factor < 1.0,
114 0 : "incompatible resource_multipler and spread_factor"
115 : );
116 0 : Ok(())
117 0 : }
118 :
119 : /// Calculate the desired size of the cache, given the total memory
120 0 : pub fn calculate_cache_size(&self, total: u64) -> u64 {
121 0 : // *Note*: all units are in bytes, until the very last line.
122 0 : let available = total.saturating_sub(self.min_remaining_after_cache.get());
123 0 : if available == 0 {
124 0 : return 0;
125 0 : }
126 0 :
127 0 : // Conversions to ensure we don't overflow from floating-point ops
128 0 : let size_from_spread =
129 0 : i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
130 0 :
131 0 : let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
132 0 :
133 0 : let byte_size = u64::min(size_from_spread, size_from_normal);
134 0 :
135 0 : // The file cache operates in units of mebibytes, so the sizes we produce should
136 0 : // be rounded to a mebibyte. We round down to be conservative.
137 0 : byte_size / MiB * MiB
138 0 : }
139 : }
140 :
141 : impl FileCacheState {
142 : /// Connect to the file cache.
143 0 : #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
144 : pub async fn new(
145 : conn_str: &str,
146 : config: FileCacheConfig,
147 : token: CancellationToken,
148 : ) -> anyhow::Result<Self> {
149 : config.validate().context("file cache config is invalid")?;
150 :
151 : info!(conn_str, "connecting to Postgres file cache");
152 : let client = FileCacheState::connect(conn_str, token.clone())
153 : .await
154 : .context("failed to connect to postgres file cache")?;
155 :
156 : let conn_str = conn_str.to_string();
157 : Ok(Self {
158 : client,
159 : config,
160 : conn_str,
161 : token,
162 : })
163 : }
164 :
165 : /// Connect to Postgres.
166 : ///
167 : /// Aborts the spawned thread if the kill signal is received. This is not
168 : /// a method as it is called in [`FileCacheState::new`].
169 0 : #[tracing::instrument(skip_all, fields(%conn_str))]
170 : async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
171 : let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
172 : .await
173 : .context("failed to connect to pg client")?;
174 :
175 : // The connection object performs the actual communication with the database,
176 : // so spawn it off to run on its own. See tokio-postgres docs.
177 : crate::spawn_with_cancel(
178 : token,
179 0 : |res| {
180 0 : if let Err(error) = res {
181 0 : error!(%error, "postgres error")
182 0 : }
183 0 : },
184 : conn,
185 : );
186 :
187 : Ok(client)
188 : }
189 :
190 : /// Execute a query with a retry if necessary.
191 : ///
192 : /// If the initial query fails, we restart the database connection and attempt
193 : /// if again.
194 0 : #[tracing::instrument(skip_all, fields(%statement))]
195 : pub async fn query_with_retry(
196 : &mut self,
197 : statement: &str,
198 : params: &[&(dyn ToSql + Sync)],
199 : ) -> anyhow::Result<Vec<Row>> {
200 : match self
201 : .client
202 : .query(statement, params)
203 : .await
204 : .context("failed to execute query")
205 : {
206 : Ok(rows) => Ok(rows),
207 : Err(e) => {
208 : error!(error = ?e, "postgres error: {e} -> retrying");
209 :
210 : let client = FileCacheState::connect(&self.conn_str, self.token.clone())
211 : .await
212 : .context("failed to connect to postgres file cache")?;
213 : info!("successfully reconnected to postgres client");
214 :
215 : // Replace the old client and attempt the query with the new one
216 : self.client = client;
217 : self.client
218 : .query(statement, params)
219 : .await
220 : .context("failed to execute query a second time")
221 : }
222 : }
223 : }
224 :
225 : /// Get the current size of the file cache.
226 0 : #[tracing::instrument(skip_all)]
227 : pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
228 : self.query_with_retry(
229 : // The file cache GUC variable is in MiB, but the conversion with
230 : // pg_size_bytes means that the end result we get is in bytes.
231 : "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
232 : &[],
233 : )
234 : .await
235 : .context("failed to query pg for file cache size")?
236 : .first()
237 0 : .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
238 : // pg_size_bytes returns a bigint which is the same as an i64.
239 : .try_get::<_, i64>(0)
240 : // Since the size of the table is not negative, the cast is sound.
241 0 : .map(|bytes| bytes as u64)
242 : .context("failed to extract file cache size from query result")
243 : }
244 :
245 : /// Attempt to set the file cache size, returning the size it was actually
246 : /// set to.
247 0 : #[tracing::instrument(skip_all, fields(%num_bytes))]
248 : pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
249 : let max_bytes = self
250 : // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
251 : // means that the end result we get is in bytes.
252 : .query_with_retry(
253 : "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
254 : &[],
255 : )
256 : .await
257 : .context("failed to query pg for max file cache size")?
258 : .first()
259 0 : .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
260 : .try_get::<_, i64>(0)
261 0 : .map(|bytes| bytes as u64)
262 : .context("failed to extract max file cache size from query result")?;
263 :
264 : let max_mb = max_bytes / MiB;
265 : let num_mb = u64::min(num_bytes, max_bytes) / MiB;
266 :
267 : let capped = if num_bytes > max_bytes {
268 : " (capped by maximum size)"
269 : } else {
270 : ""
271 : };
272 :
273 : info!(
274 : size = num_mb,
275 : max = max_mb,
276 : "updating file cache size {capped}",
277 : );
278 :
279 : // note: even though the normal ways to get the cache size produce values with trailing "MB"
280 : // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
281 : // it expects to set the value is "integer number of MB" without trailing units.
282 : // For some reason, this *really* wasn't working with normal arguments, so that's
283 : // why we're constructing the query here.
284 : self.client
285 : .query(
286 : &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
287 : &[],
288 : )
289 : .await
290 : .context("failed to change file cache size limit")?;
291 :
292 : // must use pg_reload_conf to have the settings change take effect
293 : self.client
294 : .execute("SELECT pg_reload_conf();", &[])
295 : .await
296 : .context("failed to reload config")?;
297 :
298 : Ok(num_mb * MiB)
299 : }
300 : }
|