Line data Source code
1 : //! Exposes the `Runner`, which handles messages received from agent and
2 : //! sends upscale requests.
3 : //!
4 : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
5 : //! all functionality.
6 :
7 : use std::sync::Arc;
8 : use std::{fmt::Debug, mem};
9 :
10 : use anyhow::{bail, Context};
11 : use axum::extract::ws::{Message, WebSocket};
12 : use futures::StreamExt;
13 : use tokio::sync::broadcast;
14 : use tokio::sync::mpsc;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::{error, info, warn};
17 :
18 : use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced};
19 : use crate::dispatcher::Dispatcher;
20 : use crate::filecache::{FileCacheConfig, FileCacheState};
21 : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
22 : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
23 :
24 : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
25 : /// signals from the agent.
26 0 : #[derive(Debug)]
27 : pub struct Runner {
28 : config: Config,
29 : filecache: Option<FileCacheState>,
30 : cgroup: Option<Arc<CgroupWatcher>>,
31 : dispatcher: Dispatcher,
32 :
33 : /// We "mint" new message ids by incrementing this counter and taking the value.
34 : ///
35 : /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
36 : /// by us vs the autoscaler-agent.
37 : counter: usize,
38 :
39 : /// A signal to kill the main thread produced by `self.run()`. This is triggered
40 : /// when the server receives a new connection. When the thread receives the
41 : /// signal off this channel, it will gracefully shutdown.
42 : kill: broadcast::Receiver<()>,
43 : }
44 :
45 : /// Configuration for a `Runner`
46 0 : #[derive(Debug)]
47 : pub struct Config {
48 : /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
49 : /// handing out the rest to userspace. This value is the estimated difference between the
50 : /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
51 : ///
52 : /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
53 : /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
54 : ///
55 : /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
56 : /// size, rather than the self-reported memory size, according to the kernel.
57 : ///
58 : /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
59 : /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
60 : /// should be removed once we have a better solution there.
61 : sys_buffer_bytes: u64,
62 : }
63 :
64 : impl Default for Config {
65 0 : fn default() -> Self {
66 0 : Self {
67 0 : sys_buffer_bytes: 100 * MiB,
68 0 : }
69 0 : }
70 : }
71 :
72 : impl Runner {
73 : /// Create a new monitor.
74 0 : #[tracing::instrument(skip_all, fields(?config, ?args))]
75 : pub async fn new(
76 : config: Config,
77 : args: &Args,
78 : ws: WebSocket,
79 : kill: broadcast::Receiver<()>,
80 : token: CancellationToken,
81 : ) -> anyhow::Result<Runner> {
82 : anyhow::ensure!(
83 : config.sys_buffer_bytes != 0,
84 : "invalid monitor Config: sys_buffer_bytes cannot be 0"
85 : );
86 :
87 : // *NOTE*: the dispatcher and cgroup manager talk through these channels
88 : // so make sure they each get the correct half, nothing is droppped, etc.
89 : let (notified_send, notified_recv) = mpsc::channel(1);
90 : let (requesting_send, requesting_recv) = mpsc::channel(1);
91 :
92 : let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv)
93 : .await
94 : .context("error creating new dispatcher")?;
95 :
96 : let mut state = Runner {
97 : config,
98 : filecache: None,
99 : cgroup: None,
100 : dispatcher,
101 : counter: 1, // NB: must be odd, see the comment about the field for more.
102 : kill,
103 : };
104 :
105 : let mut file_cache_reserved_bytes = 0;
106 : let mem = get_total_system_memory();
107 :
108 : // We need to process file cache initialization before cgroup initialization, so that the memory
109 : // allocated to the file cache is appropriately taken into account when we decide the cgroup's
110 : // memory limits.
111 : if let Some(connstr) = &args.pgconnstr {
112 0 : info!("initializing file cache");
113 : let config = match args.file_cache_on_disk {
114 : true => FileCacheConfig::default_on_disk(),
115 : false => FileCacheConfig::default_in_memory(),
116 : };
117 :
118 : let mut file_cache = FileCacheState::new(connstr, config, token.clone())
119 : .await
120 : .context("failed to create file cache")?;
121 :
122 : let size = file_cache
123 : .get_file_cache_size()
124 : .await
125 : .context("error getting file cache size")?;
126 :
127 : let new_size = file_cache.config.calculate_cache_size(mem);
128 0 : info!(
129 0 : initial = bytes_to_mebibytes(size),
130 0 : new = bytes_to_mebibytes(new_size),
131 0 : "setting initial file cache size",
132 0 : );
133 :
134 : // note: even if size == new_size, we want to explicitly set it, just
135 : // to make sure that we have the permissions to do so
136 : let actual_size = file_cache
137 : .set_file_cache_size(new_size)
138 : .await
139 : .context("failed to set file cache size, possibly due to inadequate permissions")?;
140 : if actual_size != new_size {
141 0 : info!("file cache size actually got set to {actual_size}")
142 : }
143 : // Mark the resources given to the file cache as reserved, but only if it's in memory.
144 : if !args.file_cache_on_disk {
145 : file_cache_reserved_bytes = actual_size;
146 : }
147 :
148 : state.filecache = Some(file_cache);
149 : }
150 :
151 : if let Some(name) = &args.cgroup {
152 : let (mut cgroup, cgroup_event_stream) =
153 : CgroupWatcher::new(name.clone(), requesting_send)
154 : .context("failed to create cgroup manager")?;
155 :
156 : let available = mem - file_cache_reserved_bytes;
157 :
158 : cgroup
159 : .set_memory_limits(available)
160 : .context("failed to set cgroup memory limits")?;
161 :
162 : let cgroup = Arc::new(cgroup);
163 :
164 : // Some might call this . . . cgroup v2
165 : let cgroup_clone = Arc::clone(&cgroup);
166 :
167 0 : spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
168 0 : cgroup_clone.watch(notified_recv, cgroup_event_stream).await
169 0 : });
170 :
171 : state.cgroup = Some(cgroup);
172 : } else {
173 : // *NOTE*: We need to forget the sender so that its drop impl does not get ran.
174 : // This allows us to poll it in `Monitor::run` regardless of whether we
175 : // are managing a cgroup or not. If we don't forget it, all receives will
176 : // immediately return an error because the sender is droped and it will
177 : // claim all select! statements, effectively turning `Monitor::run` into
178 : // `loop { fail to receive }`.
179 : mem::forget(requesting_send);
180 : }
181 :
182 : Ok(state)
183 : }
184 :
185 : /// Attempt to downscale filecache + cgroup
186 0 : #[tracing::instrument(skip_all, fields(?target))]
187 : pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
188 : // Nothing to adjust
189 : if self.cgroup.is_none() && self.filecache.is_none() {
190 0 : info!("no action needed for downscale (no cgroup or file cache enabled)");
191 : return Ok((
192 : true,
193 : "monitor is not managing cgroup or file cache".to_string(),
194 : ));
195 : }
196 :
197 : let requested_mem = target.mem;
198 : let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
199 : let expected_file_cache_mem_usage = self
200 : .filecache
201 : .as_ref()
202 0 : .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
203 : .unwrap_or(0);
204 : let mut new_cgroup_mem_high = 0;
205 : if let Some(cgroup) = &self.cgroup {
206 : new_cgroup_mem_high = cgroup
207 : .config
208 : .calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage);
209 :
210 : let current = cgroup
211 : .current_memory_usage()
212 : .context("failed to fetch cgroup memory")?;
213 :
214 : if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes {
215 : let status = format!(
216 : "{}: {} MiB (new high) < {} (current usage) + {} (buffer)",
217 : "calculated memory.high too low",
218 : bytes_to_mebibytes(new_cgroup_mem_high),
219 : bytes_to_mebibytes(current),
220 : bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes)
221 : );
222 :
223 0 : info!(status, "discontinuing downscale");
224 :
225 : return Ok((false, status));
226 : }
227 : }
228 :
229 : // The downscaling has been approved. Downscale the file cache, then the cgroup.
230 : let mut status = vec![];
231 : let mut file_cache_mem_usage = 0;
232 : if let Some(file_cache) = &mut self.filecache {
233 : let actual_usage = file_cache
234 : .set_file_cache_size(expected_file_cache_mem_usage)
235 : .await
236 : .context("failed to set file cache size")?;
237 : if file_cache.config.in_memory {
238 : file_cache_mem_usage = actual_usage;
239 : }
240 : let message = format!(
241 : "set file cache size to {} MiB (in memory = {})",
242 : bytes_to_mebibytes(actual_usage),
243 : file_cache.config.in_memory,
244 : );
245 0 : info!("downscale: {message}");
246 : status.push(message);
247 : }
248 :
249 : if let Some(cgroup) = &self.cgroup {
250 : let available_memory = usable_system_memory - file_cache_mem_usage;
251 :
252 : if file_cache_mem_usage != expected_file_cache_mem_usage {
253 : new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
254 : }
255 :
256 : let limits = MemoryLimits::new(
257 : // new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
258 : // since it is properly initialized in the previous cgroup if let block
259 : new_cgroup_mem_high,
260 : available_memory,
261 : );
262 : cgroup
263 : .set_limits(&limits)
264 : .context("failed to set cgroup memory limits")?;
265 :
266 : let message = format!(
267 : "set cgroup memory.high to {} MiB, of new max {} MiB",
268 : bytes_to_mebibytes(new_cgroup_mem_high),
269 : bytes_to_mebibytes(available_memory)
270 : );
271 0 : info!("downscale: {message}");
272 : status.push(message);
273 : }
274 :
275 : // TODO: make this status thing less jank
276 : let status = status.join("; ");
277 : Ok((true, status))
278 : }
279 :
280 : /// Handle new resources
281 0 : #[tracing::instrument(skip_all, fields(?resources))]
282 : pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
283 : if self.filecache.is_none() && self.cgroup.is_none() {
284 0 : info!("no action needed for upscale (no cgroup or file cache enabled)");
285 : return Ok(());
286 : }
287 :
288 : let new_mem = resources.mem;
289 : let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
290 :
291 : // Get the file cache's expected contribution to the memory usage
292 : let mut file_cache_mem_usage = 0;
293 : if let Some(file_cache) = &mut self.filecache {
294 : let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
295 0 : info!(
296 0 : target = bytes_to_mebibytes(expected_usage),
297 0 : total = bytes_to_mebibytes(new_mem),
298 0 : "updating file cache size",
299 0 : );
300 :
301 : let actual_usage = file_cache
302 : .set_file_cache_size(expected_usage)
303 : .await
304 : .context("failed to set file cache size")?;
305 : if file_cache.config.in_memory {
306 : file_cache_mem_usage = actual_usage;
307 : }
308 :
309 : if actual_usage != expected_usage {
310 0 : warn!(
311 0 : "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
312 0 : bytes_to_mebibytes(expected_usage),
313 0 : bytes_to_mebibytes(actual_usage)
314 0 : )
315 : }
316 : }
317 :
318 : if let Some(cgroup) = &self.cgroup {
319 : let available_memory = usable_system_memory - file_cache_mem_usage;
320 : let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
321 0 : info!(
322 0 : target = bytes_to_mebibytes(new_cgroup_mem_high),
323 0 : total = bytes_to_mebibytes(new_mem),
324 0 : name = cgroup.path(),
325 0 : "updating cgroup memory.high",
326 0 : );
327 : let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory);
328 : cgroup
329 : .set_limits(&limits)
330 : .context("failed to set file cache size")?;
331 : }
332 :
333 : Ok(())
334 : }
335 :
336 : /// Take in a message and perform some action, such as downscaling or upscaling,
337 : /// and return a message to be send back.
338 0 : #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
339 : pub async fn process_message(
340 : &mut self,
341 : InboundMsg { inner, id }: InboundMsg,
342 : ) -> anyhow::Result<Option<OutboundMsg>> {
343 : match inner {
344 : InboundMsgKind::UpscaleNotification { granted } => {
345 : self.handle_upscale(granted)
346 : .await
347 : .context("failed to handle upscale")?;
348 : self.dispatcher
349 : .notify_upscale(Sequenced::new(granted))
350 : .await
351 : .context("failed to notify notify cgroup of upscale")?;
352 : Ok(Some(OutboundMsg::new(
353 : OutboundMsgKind::UpscaleConfirmation {},
354 : id,
355 : )))
356 : }
357 : InboundMsgKind::DownscaleRequest { target } => self
358 : .try_downscale(target)
359 : .await
360 : .context("failed to downscale")
361 0 : .map(|(ok, status)| {
362 0 : Some(OutboundMsg::new(
363 0 : OutboundMsgKind::DownscaleResult { ok, status },
364 0 : id,
365 0 : ))
366 0 : }),
367 : InboundMsgKind::InvalidMessage { error } => {
368 0 : warn!(
369 0 : %error, id, "received notification of an invalid message we sent"
370 0 : );
371 : Ok(None)
372 : }
373 : InboundMsgKind::InternalError { error } => {
374 0 : warn!(error, id, "agent experienced an internal error");
375 : Ok(None)
376 : }
377 : InboundMsgKind::HealthCheck {} => {
378 : Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
379 : }
380 : }
381 : }
382 :
383 : // TODO: don't propagate errors, probably just warn!?
384 0 : #[tracing::instrument(skip_all)]
385 : pub async fn run(&mut self) -> anyhow::Result<()> {
386 0 : info!("starting dispatcher");
387 : loop {
388 0 : tokio::select! {
389 0 : signal = self.kill.recv() => {
390 : match signal {
391 : Ok(()) => return Ok(()),
392 : Err(e) => bail!("failed to receive kill signal: {e}")
393 : }
394 : }
395 : // we need to propagate an upscale request
396 0 : request = self.dispatcher.request_upscale_events.recv() => {
397 : if request.is_none() {
398 : bail!("failed to listen for upscale event from cgroup")
399 : }
400 0 : info!("cgroup asking for upscale; forwarding request");
401 : self.counter += 2; // Increment, preserving parity (i.e. keep the
402 : // counter odd). See the field comment for more.
403 : self.dispatcher
404 : .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
405 : .await
406 : .context("failed to send message")?;
407 : }
408 : // there is a message from the agent
409 0 : msg = self.dispatcher.source.next() => {
410 : if let Some(msg) = msg {
411 : // Don't use 'message' as a key as the string also uses
412 : // that for its key
413 0 : info!(?msg, "received message");
414 : match msg {
415 : Ok(msg) => {
416 : let message: InboundMsg = match msg {
417 : Message::Text(text) => {
418 : serde_json::from_str(&text).context("failed to deserialize text message")?
419 : }
420 : other => {
421 0 : warn!(
422 0 : // Don't use 'message' as a key as the
423 0 : // string also uses that for its key
424 0 : msg = ?other,
425 0 : "agent should only send text messages but received different type"
426 0 : );
427 : continue
428 : },
429 : };
430 :
431 : let out = match self.process_message(message.clone()).await {
432 : Ok(Some(out)) => out,
433 : Ok(None) => continue,
434 : Err(e) => {
435 : let error = e.to_string();
436 0 : warn!(?error, "error handling message");
437 : OutboundMsg::new(
438 : OutboundMsgKind::InternalError {
439 : error
440 : },
441 : message.id
442 : )
443 : }
444 : };
445 :
446 : self.dispatcher
447 : .send(out)
448 : .await
449 : .context("failed to send message")?;
450 : }
451 0 : Err(e) => warn!("{e}"),
452 : }
453 : } else {
454 : anyhow::bail!("dispatcher connection closed")
455 : }
456 : }
457 : }
458 : }
459 : }
460 : }
|