Line data Source code
1 : //! Exposes the `Runner`, which handles messages received from agent and
2 : //! sends upscale requests.
3 : //!
4 : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
5 : //! all functionality.
6 :
7 : use std::fmt::Debug;
8 : use std::time::{Duration, Instant};
9 :
10 : use anyhow::{Context, bail};
11 : use axum::extract::ws::{Message, WebSocket};
12 : use futures::StreamExt;
13 : use tokio::sync::{broadcast, watch};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::{debug, error, info, warn};
16 :
17 : use crate::cgroup::{self, CgroupWatcher};
18 : use crate::dispatcher::Dispatcher;
19 : use crate::filecache::{FileCacheConfig, FileCacheState};
20 : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
21 : use crate::{Args, MiB, bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel};
22 :
23 : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
24 : /// signals from the agent.
25 : #[derive(Debug)]
26 : pub struct Runner {
27 : config: Config,
28 : filecache: Option<FileCacheState>,
29 : cgroup: Option<CgroupState>,
30 : dispatcher: Dispatcher,
31 :
32 : /// We "mint" new message ids by incrementing this counter and taking the value.
33 : ///
34 : /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
35 : /// by us vs the autoscaler-agent.
36 : counter: usize,
37 :
38 : last_upscale_request_at: Option<Instant>,
39 :
40 : /// A signal to kill the main thread produced by `self.run()`. This is triggered
41 : /// when the server receives a new connection. When the thread receives the
42 : /// signal off this channel, it will gracefully shutdown.
43 : kill: broadcast::Receiver<()>,
44 : }
45 :
46 : #[derive(Debug)]
47 : struct CgroupState {
48 : watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
49 : /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
50 : /// requests.
51 : threshold: u64,
52 : }
53 :
54 : /// Configuration for a `Runner`
55 : #[derive(Debug)]
56 : pub struct Config {
57 : /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
58 : /// handing out the rest to userspace. This value is the estimated difference between the
59 : /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
60 : ///
61 : /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
62 : /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
63 : ///
64 : /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
65 : /// size, rather than the self-reported memory size, according to the kernel.
66 : ///
67 : /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
68 : /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
69 : /// should be removed once we have a better solution there.
70 : sys_buffer_bytes: u64,
71 :
72 : /// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
73 : /// other words, providing a ceiling for the highest value of the threshold by enforcing that
74 : /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
75 : /// threshold.
76 : ///
77 : /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
78 : /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
79 : /// memory.
80 : ///
81 : /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
82 : /// cgroup is using more than 85% of total memory.
83 : cgroup_min_overhead_fraction: f64,
84 :
85 : cgroup_downscale_threshold_buffer_bytes: u64,
86 : }
87 :
88 : impl Default for Config {
89 0 : fn default() -> Self {
90 0 : Self {
91 0 : sys_buffer_bytes: 100 * MiB,
92 0 : cgroup_min_overhead_fraction: 0.15,
93 0 : cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
94 0 : }
95 0 : }
96 : }
97 :
98 : impl Config {
99 0 : fn cgroup_threshold(&self, total_mem: u64) -> u64 {
100 0 : // We want our threshold to be met gracefully instead of letting postgres get OOM-killed
101 0 : // (or if there's room, spilling to swap).
102 0 : // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
103 0 : // remaining above the threshold.
104 0 : (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64
105 0 : }
106 : }
107 :
108 : impl Runner {
109 : /// Create a new monitor.
110 : #[tracing::instrument(skip_all, fields(?config, ?args))]
111 : pub async fn new(
112 : config: Config,
113 : args: &Args,
114 : ws: WebSocket,
115 : kill: broadcast::Receiver<()>,
116 : token: CancellationToken,
117 : ) -> anyhow::Result<Runner> {
118 : anyhow::ensure!(
119 : config.sys_buffer_bytes != 0,
120 : "invalid monitor Config: sys_buffer_bytes cannot be 0"
121 : );
122 :
123 : let dispatcher = Dispatcher::new(ws)
124 : .await
125 : .context("error creating new dispatcher")?;
126 :
127 : let mut state = Runner {
128 : config,
129 : filecache: None,
130 : cgroup: None,
131 : dispatcher,
132 : counter: 1, // NB: must be odd, see the comment about the field for more.
133 : last_upscale_request_at: None,
134 : kill,
135 : };
136 :
137 : let mem = get_total_system_memory();
138 :
139 : if let Some(connstr) = &args.pgconnstr {
140 : info!("initializing file cache");
141 : let config = FileCacheConfig::default();
142 :
143 : let mut file_cache = FileCacheState::new(connstr, config, token.clone())
144 : .await
145 : .context("failed to create file cache")?;
146 :
147 : let size = file_cache
148 : .get_file_cache_size()
149 : .await
150 : .context("error getting file cache size")?;
151 :
152 : let new_size = file_cache.config.calculate_cache_size(mem);
153 : info!(
154 : initial = bytes_to_mebibytes(size),
155 : new = bytes_to_mebibytes(new_size),
156 : "setting initial file cache size",
157 : );
158 :
159 : // note: even if size == new_size, we want to explicitly set it, just
160 : // to make sure that we have the permissions to do so
161 : let actual_size = file_cache
162 : .set_file_cache_size(new_size)
163 : .await
164 : .context("failed to set file cache size, possibly due to inadequate permissions")?;
165 : if actual_size != new_size {
166 : info!("file cache size actually got set to {actual_size}")
167 : }
168 :
169 : state.filecache = Some(file_cache);
170 : }
171 :
172 : if let Some(name) = &args.cgroup {
173 : // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
174 : // now, and then set limits later.
175 : info!("initializing cgroup");
176 :
177 : let cgroup =
178 : CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
179 :
180 : let init_value = cgroup::MemoryHistory {
181 : avg_non_reclaimable: 0,
182 : samples_count: 0,
183 : samples_span: Duration::ZERO,
184 : };
185 : let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
186 :
187 0 : spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
188 0 : cgroup.watch(hist_tx).await
189 0 : });
190 :
191 : let threshold = state.config.cgroup_threshold(mem);
192 : info!(threshold, "set initial cgroup threshold",);
193 :
194 : state.cgroup = Some(CgroupState {
195 : watcher: hist_rx,
196 : threshold,
197 : });
198 : }
199 :
200 : Ok(state)
201 : }
202 :
203 : /// Attempt to downscale filecache + cgroup
204 : #[tracing::instrument(skip_all, fields(?target))]
205 : pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
206 : // Nothing to adjust
207 : if self.cgroup.is_none() && self.filecache.is_none() {
208 : info!("no action needed for downscale (no cgroup or file cache enabled)");
209 : return Ok((
210 : true,
211 : "monitor is not managing cgroup or file cache".to_string(),
212 : ));
213 : }
214 :
215 : let requested_mem = target.mem;
216 : let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
217 : let expected_file_cache_size = self
218 : .filecache
219 : .as_ref()
220 0 : .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
221 : .unwrap_or(0);
222 : if let Some(cgroup) = &self.cgroup {
223 : let (last_time, last_history) = *cgroup.watcher.borrow();
224 :
225 : // NB: The ordering of these conditions is intentional. During startup, we should deny
226 : // downscaling until we have enough information to determine that it's safe to do so
227 : // (i.e. enough samples have come in). But if it's been a while and we *still* haven't
228 : // received any information, we should *fail* instead of just denying downscaling.
229 : //
230 : // `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
231 : // serves double-duty: it trips if we haven't received *any* metrics for long enough,
232 : // OR if we haven't received metrics *recently enough*.
233 : //
234 : // TODO: make the duration here configurable.
235 : if last_time.elapsed() > Duration::from_secs(5) {
236 : bail!(
237 : "haven't gotten cgroup memory stats recently enough to determine downscaling information"
238 : );
239 : } else if last_history.samples_count <= 1 {
240 : let status = "haven't received enough cgroup memory stats yet";
241 : info!(status, "discontinuing downscale");
242 : return Ok((false, status.to_owned()));
243 : }
244 :
245 : let new_threshold = self.config.cgroup_threshold(usable_system_memory);
246 :
247 : let current = last_history.avg_non_reclaimable;
248 :
249 : if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
250 : let status = format!(
251 : "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
252 : "calculated memory threshold too low",
253 : bytes_to_mebibytes(new_threshold),
254 : bytes_to_mebibytes(current),
255 : bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
256 : );
257 :
258 : info!(status, "discontinuing downscale");
259 :
260 : return Ok((false, status));
261 : }
262 : }
263 :
264 : // The downscaling has been approved. Downscale the file cache, then the cgroup.
265 : let mut status = vec![];
266 : if let Some(file_cache) = &mut self.filecache {
267 : let actual_usage = file_cache
268 : .set_file_cache_size(expected_file_cache_size)
269 : .await
270 : .context("failed to set file cache size")?;
271 : let message = format!(
272 : "set file cache size to {} MiB",
273 : bytes_to_mebibytes(actual_usage),
274 : );
275 : info!("downscale: {message}");
276 : status.push(message);
277 : }
278 :
279 : if let Some(cgroup) = &mut self.cgroup {
280 : let new_threshold = self.config.cgroup_threshold(usable_system_memory);
281 :
282 : let message = format!(
283 : "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
284 : bytes_to_mebibytes(cgroup.threshold),
285 : bytes_to_mebibytes(new_threshold),
286 : bytes_to_mebibytes(usable_system_memory)
287 : );
288 : cgroup.threshold = new_threshold;
289 : info!("downscale: {message}");
290 : status.push(message);
291 : }
292 :
293 : // TODO: make this status thing less jank
294 : let status = status.join("; ");
295 : Ok((true, status))
296 : }
297 :
298 : /// Handle new resources
299 : #[tracing::instrument(skip_all, fields(?resources))]
300 : pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
301 : if self.filecache.is_none() && self.cgroup.is_none() {
302 : info!("no action needed for upscale (no cgroup or file cache enabled)");
303 : return Ok(());
304 : }
305 :
306 : let new_mem = resources.mem;
307 : let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
308 :
309 : if let Some(file_cache) = &mut self.filecache {
310 : let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
311 : info!(
312 : target = bytes_to_mebibytes(expected_usage),
313 : total = bytes_to_mebibytes(new_mem),
314 : "updating file cache size",
315 : );
316 :
317 : let actual_usage = file_cache
318 : .set_file_cache_size(expected_usage)
319 : .await
320 : .context("failed to set file cache size")?;
321 :
322 : if actual_usage != expected_usage {
323 : warn!(
324 : "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
325 : bytes_to_mebibytes(expected_usage),
326 : bytes_to_mebibytes(actual_usage)
327 : )
328 : }
329 : }
330 :
331 : if let Some(cgroup) = &mut self.cgroup {
332 : let new_threshold = self.config.cgroup_threshold(usable_system_memory);
333 :
334 : info!(
335 : "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
336 : bytes_to_mebibytes(cgroup.threshold),
337 : bytes_to_mebibytes(new_threshold),
338 : bytes_to_mebibytes(usable_system_memory)
339 : );
340 : cgroup.threshold = new_threshold;
341 : }
342 :
343 : Ok(())
344 : }
345 :
346 : /// Take in a message and perform some action, such as downscaling or upscaling,
347 : /// and return a message to be send back.
348 : #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
349 : pub async fn process_message(
350 : &mut self,
351 : InboundMsg { inner, id }: InboundMsg,
352 : ) -> anyhow::Result<Option<OutboundMsg>> {
353 : match inner {
354 : InboundMsgKind::UpscaleNotification { granted } => {
355 : self.handle_upscale(granted)
356 : .await
357 : .context("failed to handle upscale")?;
358 : Ok(Some(OutboundMsg::new(
359 : OutboundMsgKind::UpscaleConfirmation {},
360 : id,
361 : )))
362 : }
363 : InboundMsgKind::DownscaleRequest { target } => self
364 : .try_downscale(target)
365 : .await
366 : .context("failed to downscale")
367 0 : .map(|(ok, status)| {
368 0 : Some(OutboundMsg::new(
369 0 : OutboundMsgKind::DownscaleResult { ok, status },
370 0 : id,
371 0 : ))
372 0 : }),
373 : InboundMsgKind::InvalidMessage { error } => {
374 : warn!(
375 : error = format_args!("{error:#}"),
376 : id, "received notification of an invalid message we sent"
377 : );
378 : Ok(None)
379 : }
380 : InboundMsgKind::InternalError { error } => {
381 : warn!(
382 : error = format_args!("{error:#}"),
383 : id, "agent experienced an internal error"
384 : );
385 : Ok(None)
386 : }
387 : InboundMsgKind::HealthCheck {} => {
388 : Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
389 : }
390 : }
391 : }
392 :
393 : // TODO: don't propagate errors, probably just warn!?
394 : #[tracing::instrument(skip_all)]
395 : pub async fn run(&mut self) -> anyhow::Result<()> {
396 : info!("starting dispatcher");
397 : loop {
398 : tokio::select! {
399 : signal = self.kill.recv() => {
400 : match signal {
401 : Ok(()) => return Ok(()),
402 : Err(e) => bail!("failed to receive kill signal: {e}")
403 : }
404 : }
405 :
406 : // New memory stats from the cgroup, *may* need to request upscaling, if we've
407 : // exceeded the threshold
408 : result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
409 : result.context("failed to receive from cgroup memory stats watcher")?;
410 :
411 : let cgroup = self.cgroup.as_ref().unwrap();
412 :
413 : let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
414 :
415 : // If we haven't exceeded the threshold, then we're all ok
416 : if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
417 : continue;
418 : }
419 :
420 : // Otherwise, we generally want upscaling. But, if it's been less than 1 second
421 : // since the last time we requested upscaling, ignore the event, to avoid
422 : // spamming the agent.
423 : if let Some(t) = self.last_upscale_request_at {
424 : let elapsed = t.elapsed();
425 : if elapsed < Duration::from_secs(1) {
426 : // *Ideally* we'd like to log here that we're ignoring the fact the
427 : // memory stats are too high, but in practice this can result in
428 : // spamming the logs with repetitive messages about ignoring the signal
429 : //
430 : // See https://github.com/neondatabase/neon/issues/5865 for more.
431 : continue;
432 : }
433 : }
434 :
435 : self.last_upscale_request_at = Some(Instant::now());
436 :
437 : info!(
438 : avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
439 : threshold = bytes_to_mebibytes(cgroup.threshold),
440 : "cgroup memory stats are high enough to upscale, requesting upscale",
441 : );
442 :
443 : self.counter += 2; // Increment, preserving parity (i.e. keep the
444 : // counter odd). See the field comment for more.
445 : self.dispatcher
446 : .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
447 : .await
448 : .context("failed to send message")?;
449 : },
450 :
451 : // there is a message from the agent
452 : msg = self.dispatcher.source.next() => {
453 : if let Some(msg) = msg {
454 : match &msg {
455 : Ok(msg) => {
456 : let message: InboundMsg = match msg {
457 : Message::Text(text) => {
458 : serde_json::from_str(text).context("failed to deserialize text message")?
459 : }
460 : other => {
461 : warn!(
462 : // Don't use 'message' as a key as the
463 : // string also uses that for its key
464 : msg = ?other,
465 : "problem processing incoming message: agent should only send text messages but received different type"
466 : );
467 : continue
468 : },
469 : };
470 :
471 : if matches!(&message.inner, InboundMsgKind::HealthCheck { .. }) {
472 : debug!(?msg, "received message");
473 : } else {
474 : info!(?msg, "received message");
475 : }
476 :
477 : let out = match self.process_message(message.clone()).await {
478 : Ok(Some(out)) => out,
479 : Ok(None) => continue,
480 : Err(e) => {
481 : // use {:#} for our logging because the display impl only
482 : // gives the outermost cause, and the debug impl
483 : // pretty-prints the error, whereas {:#} contains all the
484 : // causes, but is compact (no newlines).
485 : warn!(error = format_args!("{e:#}"), "error handling message");
486 : OutboundMsg::new(
487 : OutboundMsgKind::InternalError {
488 : error: e.to_string(),
489 : },
490 : message.id
491 : )
492 : }
493 : };
494 :
495 : self.dispatcher
496 : .send(out)
497 : .await
498 : .context("failed to send message")?;
499 : }
500 : Err(e) => warn!(
501 : error = format_args!("{e:#}"),
502 : msg = ?msg,
503 : "received error message"
504 : ),
505 : }
506 : } else {
507 : anyhow::bail!("dispatcher connection closed")
508 : }
509 : }
510 : }
511 : }
512 : }
513 : }
|