Line data Source code
1 : //! Logic for configuring and scaling the Postgres file cache.
2 :
3 : use std::num::NonZeroU64;
4 :
5 : use anyhow::{Context, anyhow};
6 : use tokio_postgres::types::ToSql;
7 : use tokio_postgres::{Client, NoTls, Row};
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::{error, info};
10 :
11 : use crate::MiB;
12 :
13 : /// Manages Postgres' file cache by keeping a connection open.
14 : #[derive(Debug)]
15 : pub struct FileCacheState {
16 : client: Client,
17 : conn_str: String,
18 : pub(crate) config: FileCacheConfig,
19 :
20 : /// A token for cancelling spawned threads during shutdown.
21 : token: CancellationToken,
22 : }
23 :
24 : #[derive(Debug)]
25 : pub struct FileCacheConfig {
26 : /// The size of the file cache, in terms of the size of the resource it consumes
27 : /// (currently: only memory)
28 : ///
29 : /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
30 : /// resources.
31 : ///
32 : /// This value must be strictly between 0 and 1.
33 : resource_multiplier: f64,
34 :
35 : /// The required minimum amount of memory, in bytes, that must remain available
36 : /// after subtracting the file cache.
37 : ///
38 : /// This value must be non-zero.
39 : min_remaining_after_cache: NonZeroU64,
40 :
41 : /// Controls the rate of increase in the file cache's size as it grows from zero
42 : /// (when total resources equals min_remaining_after_cache) to the desired size based on
43 : /// `resource_multiplier`.
44 : ///
45 : /// A `spread_factor` of zero means that all additional resources will go to the cache until it
46 : /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
47 : /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
48 : /// its desired size".
49 : ///
50 : /// This value must be >= 0, and must retain an increase that is more than what would be given by
51 : /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
52 : /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
53 : /// as desired by `resource_multiplier`.
54 : ///
55 : /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
56 : spread_factor: f64,
57 : }
58 :
59 : impl Default for FileCacheConfig {
60 0 : fn default() -> Self {
61 0 : Self {
62 0 : resource_multiplier: 0.75,
63 0 : // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
64 0 : // memory, the kernel will just evict from its page cache, rather than e.g. killing
65 0 : // everything.
66 0 : min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
67 0 : spread_factor: 0.1,
68 0 : }
69 0 : }
70 : }
71 :
72 : impl FileCacheConfig {
73 : /// Make sure fields of the config are consistent.
74 0 : pub fn validate(&self) -> anyhow::Result<()> {
75 : // Single field validity
76 0 : anyhow::ensure!(
77 0 : 0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
78 0 : "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
79 : self.resource_multiplier
80 : );
81 0 : anyhow::ensure!(
82 0 : self.spread_factor >= 0.0,
83 0 : "spread_factor must be >= 0, got {}",
84 : self.spread_factor
85 : );
86 :
87 : // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
88 : //
89 : // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
90 : // `spread_factor`, respectively. They are:
91 : //
92 : // `total` `min_remaining_after_cache`
93 : // size = ————————————————————— - —————————————————————————————
94 : // `spread_factor` + 1 `spread_factor` + 1
95 : //
96 : // and
97 : //
98 : // size = `resource_multiplier` × total
99 : //
100 : // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
101 : // form, with y = "size" and x = "total".
102 : //
103 : // These lines intersect at:
104 : //
105 : // `min_remaining_after_cache`
106 : // ———————————————————————————————————————————————————
107 : // 1 - `resource_multiplier` × (`spread_factor` + 1)
108 : //
109 : // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
110 : // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
111 : // (We also need it to be >= 0, but that's already guaranteed.)
112 :
113 0 : let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
114 0 : anyhow::ensure!(
115 0 : intersect_factor < 1.0,
116 0 : "incompatible resource_multipler and spread_factor"
117 : );
118 0 : Ok(())
119 0 : }
120 :
121 : /// Calculate the desired size of the cache, given the total memory
122 0 : pub fn calculate_cache_size(&self, total: u64) -> u64 {
123 0 : // *Note*: all units are in bytes, until the very last line.
124 0 : let available = total.saturating_sub(self.min_remaining_after_cache.get());
125 0 : if available == 0 {
126 0 : return 0;
127 0 : }
128 0 :
129 0 : // Conversions to ensure we don't overflow from floating-point ops
130 0 : let size_from_spread =
131 0 : i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
132 0 :
133 0 : let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
134 0 :
135 0 : let byte_size = u64::min(size_from_spread, size_from_normal);
136 0 :
137 0 : // The file cache operates in units of mebibytes, so the sizes we produce should
138 0 : // be rounded to a mebibyte. We round down to be conservative.
139 0 : byte_size / MiB * MiB
140 0 : }
141 : }
142 :
143 : impl FileCacheState {
144 : /// Connect to the file cache.
145 : #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
146 : pub async fn new(
147 : conn_str: &str,
148 : config: FileCacheConfig,
149 : token: CancellationToken,
150 : ) -> anyhow::Result<Self> {
151 : config.validate().context("file cache config is invalid")?;
152 :
153 : info!(conn_str, "connecting to Postgres file cache");
154 : let client = FileCacheState::connect(conn_str, token.clone())
155 : .await
156 : .context("failed to connect to postgres file cache")?;
157 :
158 : let conn_str = conn_str.to_string();
159 : Ok(Self {
160 : client,
161 : config,
162 : conn_str,
163 : token,
164 : })
165 : }
166 :
167 : /// Connect to Postgres.
168 : ///
169 : /// Aborts the spawned thread if the kill signal is received. This is not
170 : /// a method as it is called in [`FileCacheState::new`].
171 : #[tracing::instrument(skip_all, fields(%conn_str))]
172 : async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
173 : let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
174 : .await
175 : .context("failed to connect to pg client")?;
176 :
177 : // The connection object performs the actual communication with the database,
178 : // so spawn it off to run on its own. See tokio-postgres docs.
179 : crate::spawn_with_cancel(
180 : token,
181 0 : |res| {
182 0 : if let Err(e) = res {
183 0 : error!(error = format_args!("{e:#}"), "postgres error");
184 0 : }
185 0 : },
186 : conn,
187 : );
188 :
189 : Ok(client)
190 : }
191 :
192 : /// Execute a query with a retry if necessary.
193 : ///
194 : /// If the initial query fails, we restart the database connection and attempt
195 : /// if again.
196 : #[tracing::instrument(skip_all, fields(%statement))]
197 : pub async fn query_with_retry(
198 : &mut self,
199 : statement: &str,
200 : params: &[&(dyn ToSql + Sync)],
201 : ) -> anyhow::Result<Vec<Row>> {
202 : match self
203 : .client
204 : .query(statement, params)
205 : .await
206 : .context("failed to execute query")
207 : {
208 : Ok(rows) => Ok(rows),
209 : Err(e) => {
210 : error!(error = format_args!("{e:#}"), "postgres error -> retrying");
211 :
212 : let client = FileCacheState::connect(&self.conn_str, self.token.clone())
213 : .await
214 : .context("failed to connect to postgres file cache")?;
215 : info!("successfully reconnected to postgres client");
216 :
217 : // Replace the old client and attempt the query with the new one
218 : self.client = client;
219 : self.client
220 : .query(statement, params)
221 : .await
222 : .context("failed to execute query a second time")
223 : }
224 : }
225 : }
226 :
227 : /// Get the current size of the file cache.
228 : #[tracing::instrument(skip_all)]
229 : pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
230 : self.query_with_retry(
231 : // The file cache GUC variable is in MiB, but the conversion with
232 : // pg_size_bytes means that the end result we get is in bytes.
233 : "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
234 : &[],
235 : )
236 : .await
237 : .context("failed to query pg for file cache size")?
238 : .first()
239 0 : .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
240 : // pg_size_bytes returns a bigint which is the same as an i64.
241 : .try_get::<_, i64>(0)
242 : // Since the size of the table is not negative, the cast is sound.
243 0 : .map(|bytes| bytes as u64)
244 : .context("failed to extract file cache size from query result")
245 : }
246 :
247 : /// Attempt to set the file cache size, returning the size it was actually
248 : /// set to.
249 : #[tracing::instrument(skip_all, fields(%num_bytes))]
250 : pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
251 : let max_bytes = self
252 : // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
253 : // means that the end result we get is in bytes.
254 : .query_with_retry(
255 : "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
256 : &[],
257 : )
258 : .await
259 : .context("failed to query pg for max file cache size")?
260 : .first()
261 0 : .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
262 : .try_get::<_, i64>(0)
263 0 : .map(|bytes| bytes as u64)
264 : .context("failed to extract max file cache size from query result")?;
265 :
266 : let max_mb = max_bytes / MiB;
267 : let num_mb = u64::min(num_bytes, max_bytes) / MiB;
268 :
269 : let capped = if num_bytes > max_bytes {
270 : " (capped by maximum size)"
271 : } else {
272 : ""
273 : };
274 :
275 : info!(
276 : size = num_mb,
277 : max = max_mb,
278 : "updating file cache size {capped}",
279 : );
280 :
281 : // note: even though the normal ways to get the cache size produce values with trailing "MB"
282 : // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
283 : // it expects to set the value is "integer number of MB" without trailing units.
284 : // For some reason, this *really* wasn't working with normal arguments, so that's
285 : // why we're constructing the query here.
286 : self.client
287 : .query(
288 : &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
289 : &[],
290 : )
291 : .await
292 : .context("failed to change file cache size limit")?;
293 :
294 : // must use pg_reload_conf to have the settings change take effect
295 : self.client
296 : .execute("SELECT pg_reload_conf();", &[])
297 : .await
298 : .context("failed to reload config")?;
299 :
300 : Ok(num_mb * MiB)
301 : }
302 : }
|