Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : #![cfg(target_os = "linux")]
4 :
5 : use anyhow::Context;
6 : use axum::{
7 : extract::{ws::WebSocket, State, WebSocketUpgrade},
8 : response::Response,
9 : };
10 : use axum::{routing::get, Router, Server};
11 : use clap::Parser;
12 : use futures::Future;
13 : use std::{fmt::Debug, time::Duration};
14 : use sysinfo::{RefreshKind, System, SystemExt};
15 : use tokio::{sync::broadcast, task::JoinHandle};
16 : use tokio_util::sync::CancellationToken;
17 : use tracing::{error, info};
18 :
19 : use runner::Runner;
20 :
21 : // Code that interfaces with agent
22 : pub mod dispatcher;
23 : pub mod protocol;
24 :
25 : pub mod cgroup;
26 : pub mod filecache;
27 : pub mod runner;
28 :
29 : /// The vm-monitor is an autoscaling component started by compute_ctl.
30 : ///
31 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
32 : /// memory pressure by making requests to the autoscaler-agent.
33 0 : #[derive(Debug, Parser)]
34 : pub struct Args {
35 : /// The name of the cgroup we should monitor for memory.high events. This
36 : /// is the cgroup that postgres should be running in.
37 : #[arg(short, long)]
38 : pub cgroup: Option<String>,
39 :
40 : /// The connection string for the Postgres file cache we should manage.
41 : #[arg(short, long)]
42 : pub pgconnstr: Option<String>,
43 :
44 : /// The address we should listen on for connection requests. For the
45 : /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
46 : #[arg(short, long)]
47 0 : pub addr: String,
48 : }
49 :
50 : impl Args {
51 0 : pub fn addr(&self) -> &str {
52 0 : &self.addr
53 0 : }
54 : }
55 :
56 : /// The number of bytes in one mebibyte.
57 : #[allow(non_upper_case_globals)]
58 : const MiB: u64 = 1 << 20;
59 :
60 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
61 : /// purposes. (Most calculations in this crate use bytes directly)
62 0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
63 0 : (bytes as f32) / (MiB as f32)
64 0 : }
65 :
66 0 : pub fn get_total_system_memory() -> u64 {
67 0 : System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
68 0 : }
69 :
70 : /// Global app state for the Axum server
71 : #[derive(Debug, Clone)]
72 : pub struct ServerState {
73 : /// Used to close old connections.
74 : ///
75 : /// When a new connection is made, we send a message signalling to the old
76 : /// connection to close.
77 : pub sender: broadcast::Sender<()>,
78 :
79 : /// Used to cancel all spawned threads in the monitor.
80 : pub token: CancellationToken,
81 :
82 : // The CLI args
83 : pub args: &'static Args,
84 : }
85 :
86 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
87 : ///
88 : /// This is mainly meant to be called with futures that will be pending for a very
89 : /// long time, or are not mean to return. If it is not desirable for the future to
90 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
91 : /// be logged with `f`.
92 0 : pub fn spawn_with_cancel<T, F>(
93 0 : token: CancellationToken,
94 0 : f: F,
95 0 : future: T,
96 0 : ) -> JoinHandle<Option<T::Output>>
97 0 : where
98 0 : T: Future + Send + 'static,
99 0 : T::Output: Send + 'static,
100 0 : F: FnOnce(&T::Output) + Send + 'static,
101 0 : {
102 0 : tokio::spawn(async move {
103 0 : tokio::select! {
104 0 : _ = token.cancelled() => {
105 0 : info!("received global kill signal");
106 0 : None
107 : }
108 0 : res = future => {
109 0 : f(&res);
110 0 : Some(res)
111 : }
112 : }
113 0 : })
114 0 : }
115 :
116 : /// The entrypoint to the binary.
117 : ///
118 : /// Set up tracing, parse arguments, and start an http server.
119 0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
120 0 : // This channel is used to close old connections. When a new connection is
121 0 : // made, we send a message signalling to the old connection to close.
122 0 : let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
123 0 :
124 0 : let app = Router::new()
125 0 : // This route gets upgraded to a websocket connection. We only support
126 0 : // one connection at a time, which we enforce by killing old connections
127 0 : // when we receive a new one.
128 0 : .route("/monitor", get(ws_handler))
129 0 : .with_state(ServerState {
130 0 : sender,
131 0 : token,
132 0 : args,
133 0 : });
134 0 :
135 0 : let addr = args.addr();
136 0 : let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
137 0 : .with_context(|| format!("failed to bind to {addr}"))?;
138 :
139 0 : info!(addr, "server bound");
140 :
141 0 : bound
142 0 : .serve(app.into_make_service())
143 0 : .await
144 0 : .context("server exited")?;
145 :
146 0 : Ok(())
147 0 : }
148 :
149 : /// Handles incoming websocket connections.
150 : ///
151 : /// If we are already to connected to an agent, we kill that old connection
152 : /// and accept the new one.
153 0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
154 : pub async fn ws_handler(
155 : ws: WebSocketUpgrade,
156 : State(ServerState {
157 : sender,
158 : token,
159 : args,
160 : }): State<ServerState>,
161 : ) -> Response {
162 : // Kill the old monitor
163 : info!("closing old connection if there is one");
164 : let _ = sender.send(());
165 :
166 : // Start the new one. Wow, the cycle of death and rebirth
167 : let closer = sender.subscribe();
168 0 : ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
169 : }
170 :
171 : /// Starts the monitor. If startup fails or the monitor exits, an error will
172 : /// be logged and our internal state will be reset to allow for new connections.
173 0 : #[tracing::instrument(skip_all)]
174 : async fn start_monitor(
175 : ws: WebSocket,
176 : args: &Args,
177 : kill: broadcast::Receiver<()>,
178 : token: CancellationToken,
179 : ) {
180 : info!(
181 : ?args,
182 : "accepted new websocket connection -> starting monitor"
183 : );
184 : let timeout = Duration::from_secs(4);
185 : let monitor = tokio::time::timeout(
186 : timeout,
187 : Runner::new(Default::default(), args, ws, kill, token),
188 : )
189 : .await;
190 : let mut monitor = match monitor {
191 : Ok(Ok(monitor)) => monitor,
192 : Ok(Err(error)) => {
193 : error!(?error, "failed to create monitor");
194 : return;
195 : }
196 : Err(_) => {
197 : error!(
198 : ?timeout,
199 : "creating monitor timed out (probably waiting to receive protocol range)"
200 : );
201 : return;
202 : }
203 : };
204 : info!("connected to agent");
205 :
206 : match monitor.run().await {
207 : Ok(()) => info!("monitor was killed due to new connection"),
208 : Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
209 : }
210 : }
|