TLA Line data Source code
1 : //! Exposes the `Runner`, which handles messages received from agent and
2 : //! sends upscale requests.
3 : //!
4 : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
5 : //! all functionality.
6 :
7 : use std::fmt::Debug;
8 : use std::time::{Duration, Instant};
9 :
10 : use anyhow::{bail, Context};
11 : use axum::extract::ws::{Message, WebSocket};
12 : use futures::StreamExt;
13 : use tokio::sync::{broadcast, watch};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::{error, info, warn};
16 :
17 : use crate::cgroup::{self, CgroupWatcher};
18 : use crate::dispatcher::Dispatcher;
19 : use crate::filecache::{FileCacheConfig, FileCacheState};
20 : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
21 : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
22 :
23 : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
24 : /// signals from the agent.
25 UBC 0 : #[derive(Debug)]
26 : pub struct Runner {
27 : config: Config,
28 : filecache: Option<FileCacheState>,
29 : cgroup: Option<CgroupState>,
30 : dispatcher: Dispatcher,
31 :
32 : /// We "mint" new message ids by incrementing this counter and taking the value.
33 : ///
34 : /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
35 : /// by us vs the autoscaler-agent.
36 : counter: usize,
37 :
38 : last_upscale_request_at: Option<Instant>,
39 :
40 : /// A signal to kill the main thread produced by `self.run()`. This is triggered
41 : /// when the server receives a new connection. When the thread receives the
42 : /// signal off this channel, it will gracefully shutdown.
43 : kill: broadcast::Receiver<()>,
44 : }
45 :
46 0 : #[derive(Debug)]
47 : struct CgroupState {
48 : watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
49 : /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
50 : /// requests.
51 : threshold: u64,
52 : }
53 :
54 : /// Configuration for a `Runner`
55 0 : #[derive(Debug)]
56 : pub struct Config {
57 : /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
58 : /// handing out the rest to userspace. This value is the estimated difference between the
59 : /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
60 : ///
61 : /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
62 : /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
63 : ///
64 : /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
65 : /// size, rather than the self-reported memory size, according to the kernel.
66 : ///
67 : /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
68 : /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
69 : /// should be removed once we have a better solution there.
70 : sys_buffer_bytes: u64,
71 :
72 : /// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
73 : /// other words, providing a ceiling for the highest value of the threshold by enforcing that
74 : /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
75 : /// threshold.
76 : ///
77 : /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
78 : /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
79 : /// memory.
80 : ///
81 : /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
82 : /// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
83 : /// memory for the file cache).
84 : cgroup_min_overhead_fraction: f64,
85 :
86 : cgroup_downscale_threshold_buffer_bytes: u64,
87 : }
88 :
89 : impl Default for Config {
90 0 : fn default() -> Self {
91 0 : Self {
92 0 : sys_buffer_bytes: 100 * MiB,
93 0 : cgroup_min_overhead_fraction: 0.15,
94 0 : cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
95 0 : }
96 0 : }
97 : }
98 :
99 : impl Config {
100 0 : fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
101 0 : // If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
102 0 : // and thus be non-reclaimable, so we should allow for additional memory usage.
103 0 : //
104 0 : // If the file cache sits on disk, our desired stable system state is for it to be fully
105 0 : // page cached (its contents should only be paged to/from disk in situations where we can't
106 0 : // upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
107 0 : // threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
108 0 : // out the file cache.
109 0 : let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
110 0 :
111 0 : // Even if we're not separately making room for the file cache (if it's in tmpfs), we still
112 0 : // want our threshold to be met gracefully instead of letting postgres get OOM-killed.
113 0 : // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
114 0 : // remaining above the threshold.
115 0 : let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
116 0 :
117 0 : memory_remaining_for_cgroup.min(max_threshold)
118 0 : }
119 : }
120 :
121 : impl Runner {
122 : /// Create a new monitor.
123 0 : #[tracing::instrument(skip_all, fields(?config, ?args))]
124 : pub async fn new(
125 : config: Config,
126 : args: &Args,
127 : ws: WebSocket,
128 : kill: broadcast::Receiver<()>,
129 : token: CancellationToken,
130 : ) -> anyhow::Result<Runner> {
131 : anyhow::ensure!(
132 : config.sys_buffer_bytes != 0,
133 : "invalid monitor Config: sys_buffer_bytes cannot be 0"
134 : );
135 :
136 : let dispatcher = Dispatcher::new(ws)
137 : .await
138 : .context("error creating new dispatcher")?;
139 :
140 : let mut state = Runner {
141 : config,
142 : filecache: None,
143 : cgroup: None,
144 : dispatcher,
145 : counter: 1, // NB: must be odd, see the comment about the field for more.
146 : last_upscale_request_at: None,
147 : kill,
148 : };
149 :
150 : let mem = get_total_system_memory();
151 :
152 : let mut file_cache_disk_size = 0;
153 :
154 : // We need to process file cache initialization before cgroup initialization, so that the memory
155 : // allocated to the file cache is appropriately taken into account when we decide the cgroup's
156 : // memory limits.
157 : if let Some(connstr) = &args.pgconnstr {
158 0 : info!("initializing file cache");
159 : let config = match args.file_cache_on_disk {
160 : true => FileCacheConfig::default_on_disk(),
161 : false => FileCacheConfig::default_in_memory(),
162 : };
163 :
164 : let mut file_cache = FileCacheState::new(connstr, config, token.clone())
165 : .await
166 : .context("failed to create file cache")?;
167 :
168 : let size = file_cache
169 : .get_file_cache_size()
170 : .await
171 : .context("error getting file cache size")?;
172 :
173 : let new_size = file_cache.config.calculate_cache_size(mem);
174 0 : info!(
175 0 : initial = bytes_to_mebibytes(size),
176 0 : new = bytes_to_mebibytes(new_size),
177 0 : "setting initial file cache size",
178 0 : );
179 :
180 : // note: even if size == new_size, we want to explicitly set it, just
181 : // to make sure that we have the permissions to do so
182 : let actual_size = file_cache
183 : .set_file_cache_size(new_size)
184 : .await
185 : .context("failed to set file cache size, possibly due to inadequate permissions")?;
186 : if actual_size != new_size {
187 0 : info!("file cache size actually got set to {actual_size}")
188 : }
189 :
190 : if args.file_cache_on_disk {
191 : file_cache_disk_size = actual_size;
192 : }
193 :
194 : state.filecache = Some(file_cache);
195 : }
196 :
197 : if let Some(name) = &args.cgroup {
198 : // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
199 : // now, and then set limits later.
200 0 : info!("initializing cgroup");
201 :
202 : let cgroup =
203 : CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
204 :
205 : let init_value = cgroup::MemoryHistory {
206 : avg_non_reclaimable: 0,
207 : samples_count: 0,
208 : samples_span: Duration::ZERO,
209 : };
210 : let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
211 :
212 0 : spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
213 0 : cgroup.watch(hist_tx).await
214 0 : });
215 :
216 : let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
217 0 : info!(threshold, "set initial cgroup threshold",);
218 :
219 : state.cgroup = Some(CgroupState {
220 : watcher: hist_rx,
221 : threshold,
222 : });
223 : }
224 :
225 : Ok(state)
226 : }
227 :
228 : /// Attempt to downscale filecache + cgroup
229 0 : #[tracing::instrument(skip_all, fields(?target))]
230 : pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
231 : // Nothing to adjust
232 : if self.cgroup.is_none() && self.filecache.is_none() {
233 0 : info!("no action needed for downscale (no cgroup or file cache enabled)");
234 : return Ok((
235 : true,
236 : "monitor is not managing cgroup or file cache".to_string(),
237 : ));
238 : }
239 :
240 : let requested_mem = target.mem;
241 : let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
242 : let (expected_file_cache_size, expected_file_cache_disk_size) = self
243 : .filecache
244 : .as_ref()
245 0 : .map(|file_cache| {
246 0 : let size = file_cache.config.calculate_cache_size(usable_system_memory);
247 0 : match file_cache.config.in_memory {
248 0 : true => (size, 0),
249 0 : false => (size, size),
250 : }
251 0 : })
252 : .unwrap_or((0, 0));
253 : if let Some(cgroup) = &self.cgroup {
254 : let (last_time, last_history) = *cgroup.watcher.borrow();
255 :
256 : // TODO: make the duration here configurable.
257 : if last_time.elapsed() > Duration::from_secs(5) {
258 : bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
259 : } else if last_history.samples_count <= 1 {
260 : bail!("haven't received enough cgroup memory stats yet");
261 : }
262 :
263 : let new_threshold = self
264 : .config
265 : .cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
266 :
267 : let current = last_history.avg_non_reclaimable;
268 :
269 : if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
270 : let status = format!(
271 : "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
272 : "calculated memory threshold too low",
273 : bytes_to_mebibytes(new_threshold),
274 : bytes_to_mebibytes(current),
275 : bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
276 : );
277 :
278 0 : info!(status, "discontinuing downscale");
279 :
280 : return Ok((false, status));
281 : }
282 : }
283 :
284 : // The downscaling has been approved. Downscale the file cache, then the cgroup.
285 : let mut status = vec![];
286 : let mut file_cache_disk_size = 0;
287 : if let Some(file_cache) = &mut self.filecache {
288 : let actual_usage = file_cache
289 : .set_file_cache_size(expected_file_cache_size)
290 : .await
291 : .context("failed to set file cache size")?;
292 : if !file_cache.config.in_memory {
293 : file_cache_disk_size = actual_usage;
294 : }
295 : let message = format!(
296 : "set file cache size to {} MiB (in memory = {})",
297 : bytes_to_mebibytes(actual_usage),
298 : file_cache.config.in_memory,
299 : );
300 0 : info!("downscale: {message}");
301 : status.push(message);
302 : }
303 :
304 : if let Some(cgroup) = &mut self.cgroup {
305 : let new_threshold = self
306 : .config
307 : .cgroup_threshold(usable_system_memory, file_cache_disk_size);
308 :
309 : let message = format!(
310 : "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
311 : bytes_to_mebibytes(cgroup.threshold),
312 : bytes_to_mebibytes(new_threshold),
313 : bytes_to_mebibytes(usable_system_memory)
314 : );
315 : cgroup.threshold = new_threshold;
316 0 : info!("downscale: {message}");
317 : status.push(message);
318 : }
319 :
320 : // TODO: make this status thing less jank
321 : let status = status.join("; ");
322 : Ok((true, status))
323 : }
324 :
325 : /// Handle new resources
326 0 : #[tracing::instrument(skip_all, fields(?resources))]
327 : pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
328 : if self.filecache.is_none() && self.cgroup.is_none() {
329 0 : info!("no action needed for upscale (no cgroup or file cache enabled)");
330 : return Ok(());
331 : }
332 :
333 : let new_mem = resources.mem;
334 : let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
335 :
336 : let mut file_cache_disk_size = 0;
337 : if let Some(file_cache) = &mut self.filecache {
338 : let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
339 0 : info!(
340 0 : target = bytes_to_mebibytes(expected_usage),
341 0 : total = bytes_to_mebibytes(new_mem),
342 0 : "updating file cache size",
343 0 : );
344 :
345 : let actual_usage = file_cache
346 : .set_file_cache_size(expected_usage)
347 : .await
348 : .context("failed to set file cache size")?;
349 : if !file_cache.config.in_memory {
350 : file_cache_disk_size = actual_usage;
351 : }
352 :
353 : if actual_usage != expected_usage {
354 0 : warn!(
355 0 : "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
356 0 : bytes_to_mebibytes(expected_usage),
357 0 : bytes_to_mebibytes(actual_usage)
358 0 : )
359 : }
360 : }
361 :
362 : if let Some(cgroup) = &mut self.cgroup {
363 : let new_threshold = self
364 : .config
365 : .cgroup_threshold(usable_system_memory, file_cache_disk_size);
366 :
367 0 : info!(
368 0 : "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
369 0 : bytes_to_mebibytes(cgroup.threshold),
370 0 : bytes_to_mebibytes(new_threshold),
371 0 : bytes_to_mebibytes(usable_system_memory)
372 0 : );
373 : cgroup.threshold = new_threshold;
374 : }
375 :
376 : Ok(())
377 : }
378 :
379 : /// Take in a message and perform some action, such as downscaling or upscaling,
380 : /// and return a message to be send back.
381 0 : #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
382 : pub async fn process_message(
383 : &mut self,
384 : InboundMsg { inner, id }: InboundMsg,
385 : ) -> anyhow::Result<Option<OutboundMsg>> {
386 : match inner {
387 : InboundMsgKind::UpscaleNotification { granted } => {
388 : self.handle_upscale(granted)
389 : .await
390 : .context("failed to handle upscale")?;
391 : Ok(Some(OutboundMsg::new(
392 : OutboundMsgKind::UpscaleConfirmation {},
393 : id,
394 : )))
395 : }
396 : InboundMsgKind::DownscaleRequest { target } => self
397 : .try_downscale(target)
398 : .await
399 : .context("failed to downscale")
400 0 : .map(|(ok, status)| {
401 0 : Some(OutboundMsg::new(
402 0 : OutboundMsgKind::DownscaleResult { ok, status },
403 0 : id,
404 0 : ))
405 0 : }),
406 : InboundMsgKind::InvalidMessage { error } => {
407 0 : warn!(
408 0 : %error, id, "received notification of an invalid message we sent"
409 0 : );
410 : Ok(None)
411 : }
412 : InboundMsgKind::InternalError { error } => {
413 0 : warn!(error, id, "agent experienced an internal error");
414 : Ok(None)
415 : }
416 : InboundMsgKind::HealthCheck {} => {
417 : Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
418 : }
419 : }
420 : }
421 :
422 : // TODO: don't propagate errors, probably just warn!?
423 0 : #[tracing::instrument(skip_all)]
424 : pub async fn run(&mut self) -> anyhow::Result<()> {
425 0 : info!("starting dispatcher");
426 : loop {
427 0 : tokio::select! {
428 0 : signal = self.kill.recv() => {
429 : match signal {
430 : Ok(()) => return Ok(()),
431 : Err(e) => bail!("failed to receive kill signal: {e}")
432 : }
433 : }
434 :
435 : // New memory stats from the cgroup, *may* need to request upscaling, if we've
436 : // exceeded the threshold
437 0 : result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
438 : result.context("failed to receive from cgroup memory stats watcher")?;
439 :
440 : let cgroup = self.cgroup.as_ref().unwrap();
441 :
442 : let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
443 :
444 : // If we haven't exceeded the threshold, then we're all ok
445 : if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
446 : continue;
447 : }
448 :
449 : // Otherwise, we generally want upscaling. But, if it's been less than 1 second
450 : // since the last time we requested upscaling, ignore the event, to avoid
451 : // spamming the agent.
452 : if let Some(t) = self.last_upscale_request_at {
453 : let elapsed = t.elapsed();
454 : if elapsed < Duration::from_secs(1) {
455 0 : info!(
456 0 : elapsed_millis = elapsed.as_millis(),
457 0 : avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
458 0 : threshold = bytes_to_mebibytes(cgroup.threshold),
459 0 : "cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring",
460 0 : );
461 : continue;
462 : }
463 : }
464 :
465 : self.last_upscale_request_at = Some(Instant::now());
466 :
467 0 : info!(
468 0 : avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
469 0 : threshold = bytes_to_mebibytes(cgroup.threshold),
470 0 : "cgroup memory stats are high enough to upscale, requesting upscale",
471 0 : );
472 :
473 : self.counter += 2; // Increment, preserving parity (i.e. keep the
474 : // counter odd). See the field comment for more.
475 : self.dispatcher
476 : .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
477 : .await
478 : .context("failed to send message")?;
479 : },
480 :
481 : // there is a message from the agent
482 0 : msg = self.dispatcher.source.next() => {
483 : if let Some(msg) = msg {
484 : // Don't use 'message' as a key as the string also uses
485 : // that for its key
486 0 : info!(?msg, "received message");
487 : match msg {
488 : Ok(msg) => {
489 : let message: InboundMsg = match msg {
490 : Message::Text(text) => {
491 : serde_json::from_str(&text).context("failed to deserialize text message")?
492 : }
493 : other => {
494 0 : warn!(
495 0 : // Don't use 'message' as a key as the
496 0 : // string also uses that for its key
497 0 : msg = ?other,
498 0 : "agent should only send text messages but received different type"
499 0 : );
500 : continue
501 : },
502 : };
503 :
504 : let out = match self.process_message(message.clone()).await {
505 : Ok(Some(out)) => out,
506 : Ok(None) => continue,
507 : Err(e) => {
508 : let error = e.to_string();
509 0 : warn!(?error, "error handling message");
510 : OutboundMsg::new(
511 : OutboundMsgKind::InternalError {
512 : error
513 : },
514 : message.id
515 : )
516 : }
517 : };
518 :
519 : self.dispatcher
520 : .send(out)
521 : .await
522 : .context("failed to send message")?;
523 : }
524 0 : Err(e) => warn!("{e}"),
525 : }
526 : } else {
527 : anyhow::bail!("dispatcher connection closed")
528 : }
529 : }
530 : }
531 : }
532 : }
533 : }
|