TLA Line data Source code
1 : //! Logic for configuring and scaling the Postgres file cache.
2 :
3 : use std::num::NonZeroU64;
4 :
5 : use crate::MiB;
6 : use anyhow::{anyhow, Context};
7 : use tokio_postgres::{types::ToSql, Client, NoTls, Row};
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::{error, info};
10 :
11 : /// Manages Postgres' file cache by keeping a connection open.
12 UBC 0 : #[derive(Debug)]
13 : pub struct FileCacheState {
14 : client: Client,
15 : conn_str: String,
16 : pub(crate) config: FileCacheConfig,
17 :
18 : /// A token for cancelling spawned threads during shutdown.
19 : token: CancellationToken,
20 : }
21 :
22 0 : #[derive(Debug)]
23 : pub struct FileCacheConfig {
24 : /// Whether the file cache is *actually* stored in memory (e.g. by writing to
25 : /// a tmpfs or shmem file). If true, the size of the file cache will be counted against the
26 : /// memory available for the cgroup.
27 : pub(crate) in_memory: bool,
28 :
29 : /// The size of the file cache, in terms of the size of the resource it consumes
30 : /// (currently: only memory)
31 : ///
32 : /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total
33 : /// resources.
34 : ///
35 : /// This value must be strictly between 0 and 1.
36 : resource_multiplier: f64,
37 :
38 : /// The required minimum amount of memory, in bytes, that must remain available
39 : /// after subtracting the file cache.
40 : ///
41 : /// This value must be non-zero.
42 : min_remaining_after_cache: NonZeroU64,
43 :
44 : /// Controls the rate of increase in the file cache's size as it grows from zero
45 : /// (when total resources equals min_remaining_after_cache) to the desired size based on
46 : /// `resource_multiplier`.
47 : ///
48 : /// A `spread_factor` of zero means that all additional resources will go to the cache until it
49 : /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to
50 : /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to
51 : /// its desired size".
52 : ///
53 : /// This value must be >= 0, and must retain an increase that is more than what would be given by
54 : /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1
55 : /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75%
56 : /// as desired by `resource_multiplier`.
57 : ///
58 : /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`.
59 : spread_factor: f64,
60 : }
61 :
62 : impl FileCacheConfig {
63 0 : pub fn default_in_memory() -> Self {
64 0 : Self {
65 0 : in_memory: true,
66 0 : // 75 %
67 0 : resource_multiplier: 0.75,
68 0 : // 640 MiB; (512 + 128)
69 0 : min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(),
70 0 : // ensure any increase in file cache size is split 90-10 with 10% to other memory
71 0 : spread_factor: 0.1,
72 0 : }
73 0 : }
74 :
75 0 : pub fn default_on_disk() -> Self {
76 0 : Self {
77 0 : in_memory: false,
78 0 : resource_multiplier: 0.75,
79 0 : // 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
80 0 : // memory, the kernel will just evict from its page cache, rather than e.g. killing
81 0 : // everything.
82 0 : min_remaining_after_cache: NonZeroU64::new(256 * MiB).unwrap(),
83 0 : spread_factor: 0.1,
84 0 : }
85 0 : }
86 :
87 : /// Make sure fields of the config are consistent.
88 0 : pub fn validate(&self) -> anyhow::Result<()> {
89 0 : // Single field validity
90 0 : anyhow::ensure!(
91 0 : 0.0 < self.resource_multiplier && self.resource_multiplier < 1.0,
92 0 : "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}",
93 : self.resource_multiplier
94 : );
95 0 : anyhow::ensure!(
96 0 : self.spread_factor >= 0.0,
97 0 : "spread_factor must be >= 0, got {}",
98 : self.spread_factor
99 : );
100 :
101 : // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other.
102 : //
103 : // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and
104 : // `spread_factor`, respectively. They are:
105 : //
106 : // `total` `min_remaining_after_cache`
107 : // size = ————————————————————— - —————————————————————————————
108 : // `spread_factor` + 1 `spread_factor` + 1
109 : //
110 : // and
111 : //
112 : // size = `resource_multiplier` × total
113 : //
114 : // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b'
115 : // form, with y = "size" and x = "total".
116 : //
117 : // These lines intersect at:
118 : //
119 : // `min_remaining_after_cache`
120 : // ———————————————————————————————————————————————————
121 : // 1 - `resource_multiplier` × (`spread_factor` + 1)
122 : //
123 : // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is
124 : // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1.
125 : // (We also need it to be >= 0, but that's already guaranteed.)
126 :
127 0 : let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0);
128 0 : anyhow::ensure!(
129 0 : intersect_factor < 1.0,
130 0 : "incompatible resource_multipler and spread_factor"
131 : );
132 0 : Ok(())
133 0 : }
134 :
135 : /// Calculate the desired size of the cache, given the total memory
136 0 : pub fn calculate_cache_size(&self, total: u64) -> u64 {
137 0 : // *Note*: all units are in bytes, until the very last line.
138 0 : let available = total.saturating_sub(self.min_remaining_after_cache.get());
139 0 : if available == 0 {
140 0 : return 0;
141 0 : }
142 0 :
143 0 : // Conversions to ensure we don't overflow from floating-point ops
144 0 : let size_from_spread =
145 0 : i64::max(0, (available as f64 / (1.0 + self.spread_factor)) as i64) as u64;
146 0 :
147 0 : let size_from_normal = (total as f64 * self.resource_multiplier) as u64;
148 0 :
149 0 : let byte_size = u64::min(size_from_spread, size_from_normal);
150 0 :
151 0 : // The file cache operates in units of mebibytes, so the sizes we produce should
152 0 : // be rounded to a mebibyte. We round down to be conservative.
153 0 : byte_size / MiB * MiB
154 0 : }
155 : }
156 :
157 : impl FileCacheState {
158 : /// Connect to the file cache.
159 0 : #[tracing::instrument(skip_all, fields(%conn_str, ?config))]
160 : pub async fn new(
161 : conn_str: &str,
162 : config: FileCacheConfig,
163 : token: CancellationToken,
164 : ) -> anyhow::Result<Self> {
165 : config.validate().context("file cache config is invalid")?;
166 :
167 0 : info!(conn_str, "connecting to Postgres file cache");
168 : let client = FileCacheState::connect(conn_str, token.clone())
169 : .await
170 : .context("failed to connect to postgres file cache")?;
171 :
172 : let conn_str = conn_str.to_string();
173 : Ok(Self {
174 : client,
175 : config,
176 : conn_str,
177 : token,
178 : })
179 : }
180 :
181 : /// Connect to Postgres.
182 : ///
183 : /// Aborts the spawned thread if the kill signal is received. This is not
184 : /// a method as it is called in [`FileCacheState::new`].
185 0 : #[tracing::instrument(skip_all, fields(%conn_str))]
186 : async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result<Client> {
187 : let (client, conn) = tokio_postgres::connect(conn_str, NoTls)
188 : .await
189 : .context("failed to connect to pg client")?;
190 :
191 : // The connection object performs the actual communication with the database,
192 : // so spawn it off to run on its own. See tokio-postgres docs.
193 : crate::spawn_with_cancel(
194 : token,
195 : |res| {
196 0 : if let Err(error) = res {
197 0 : error!(%error, "postgres error")
198 0 : }
199 0 : },
200 : conn,
201 : );
202 :
203 : Ok(client)
204 : }
205 :
206 : /// Execute a query with a retry if necessary.
207 : ///
208 : /// If the initial query fails, we restart the database connection and attempt
209 : /// if again.
210 0 : #[tracing::instrument(skip_all, fields(%statement))]
211 : pub async fn query_with_retry(
212 : &mut self,
213 : statement: &str,
214 : params: &[&(dyn ToSql + Sync)],
215 : ) -> anyhow::Result<Vec<Row>> {
216 : match self
217 : .client
218 : .query(statement, params)
219 : .await
220 : .context("failed to execute query")
221 : {
222 : Ok(rows) => Ok(rows),
223 : Err(e) => {
224 0 : error!(error = ?e, "postgres error: {e} -> retrying");
225 :
226 : let client = FileCacheState::connect(&self.conn_str, self.token.clone())
227 : .await
228 : .context("failed to connect to postgres file cache")?;
229 0 : info!("successfully reconnected to postgres client");
230 :
231 : // Replace the old client and attempt the query with the new one
232 : self.client = client;
233 : self.client
234 : .query(statement, params)
235 : .await
236 : .context("failed to execute query a second time")
237 : }
238 : }
239 : }
240 :
241 : /// Get the current size of the file cache.
242 0 : #[tracing::instrument(skip_all)]
243 : pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
244 : self.query_with_retry(
245 : // The file cache GUC variable is in MiB, but the conversion with
246 : // pg_size_bytes means that the end result we get is in bytes.
247 : "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));",
248 : &[],
249 : )
250 : .await
251 : .context("failed to query pg for file cache size")?
252 : .first()
253 0 : .ok_or_else(|| anyhow!("file cache size query returned no rows"))?
254 : // pg_size_bytes returns a bigint which is the same as an i64.
255 : .try_get::<_, i64>(0)
256 : // Since the size of the table is not negative, the cast is sound.
257 0 : .map(|bytes| bytes as u64)
258 : .context("failed to extract file cache size from query result")
259 : }
260 :
261 : /// Attempt to set the file cache size, returning the size it was actually
262 : /// set to.
263 0 : #[tracing::instrument(skip_all, fields(%num_bytes))]
264 : pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result<u64> {
265 : let max_bytes = self
266 : // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes
267 : // means that the end result we get is in bytes.
268 : .query_with_retry(
269 : "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));",
270 : &[],
271 : )
272 : .await
273 : .context("failed to query pg for max file cache size")?
274 : .first()
275 0 : .ok_or_else(|| anyhow!("max file cache size query returned no rows"))?
276 : .try_get::<_, i64>(0)
277 0 : .map(|bytes| bytes as u64)
278 : .context("failed to extract max file cache size from query result")?;
279 :
280 : let max_mb = max_bytes / MiB;
281 : let num_mb = u64::min(num_bytes, max_bytes) / MiB;
282 :
283 : let capped = if num_bytes > max_bytes {
284 : " (capped by maximum size)"
285 : } else {
286 : ""
287 : };
288 :
289 0 : info!(
290 0 : size = num_mb,
291 0 : max = max_mb,
292 0 : "updating file cache size {capped}",
293 0 : );
294 :
295 : // note: even though the normal ways to get the cache size produce values with trailing "MB"
296 : // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format
297 : // it expects to set the value is "integer number of MB" without trailing units.
298 : // For some reason, this *really* wasn't working with normal arguments, so that's
299 : // why we're constructing the query here.
300 : self.client
301 : .query(
302 : &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
303 : &[],
304 : )
305 : .await
306 : .context("failed to change file cache size limit")?;
307 :
308 : // must use pg_reload_conf to have the settings change take effect
309 : self.client
310 : .execute("SELECT pg_reload_conf();", &[])
311 : .await
312 : .context("failed to reload config")?;
313 :
314 : Ok(num_mb * MiB)
315 : }
316 : }
|