Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : #![cfg(target_os = "linux")]
4 :
5 : use std::fmt::Debug;
6 : use std::net::SocketAddr;
7 : use std::time::Duration;
8 :
9 : use anyhow::Context;
10 : use axum::Router;
11 : use axum::extract::ws::WebSocket;
12 : use axum::extract::{State, WebSocketUpgrade};
13 : use axum::response::Response;
14 : use axum::routing::get;
15 : use clap::Parser;
16 : use futures::Future;
17 : use runner::Runner;
18 : use sysinfo::{RefreshKind, System, SystemExt};
19 : use tokio::net::TcpListener;
20 : use tokio::sync::broadcast;
21 : use tokio::task::JoinHandle;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{error, info};
24 :
25 : // Code that interfaces with agent
26 : pub mod dispatcher;
27 : pub mod protocol;
28 :
29 : pub mod cgroup;
30 : pub mod filecache;
31 : pub mod runner;
32 :
33 : /// The vm-monitor is an autoscaling component started by compute_ctl.
34 : ///
35 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
36 : /// memory pressure by making requests to the autoscaler-agent.
37 : #[derive(Debug, Parser)]
38 : pub struct Args {
39 : /// The name of the cgroup we should monitor for memory.high events. This
40 : /// is the cgroup that postgres should be running in.
41 : #[arg(short, long)]
42 : pub cgroup: Option<String>,
43 :
44 : /// The connection string for the Postgres file cache we should manage.
45 : #[arg(short, long)]
46 : pub pgconnstr: Option<String>,
47 :
48 : /// The address we should listen on for connection requests. For the
49 : /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
50 : #[arg(short, long)]
51 0 : pub addr: String,
52 : }
53 :
54 : impl Args {
55 0 : pub fn addr(&self) -> &str {
56 0 : &self.addr
57 0 : }
58 : }
59 :
60 : /// The number of bytes in one mebibyte.
61 : #[allow(non_upper_case_globals)]
62 : const MiB: u64 = 1 << 20;
63 :
64 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
65 : /// purposes. (Most calculations in this crate use bytes directly)
66 0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
67 0 : (bytes as f32) / (MiB as f32)
68 0 : }
69 :
70 0 : pub fn get_total_system_memory() -> u64 {
71 0 : System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
72 0 : }
73 :
74 : /// Global app state for the Axum server
75 : #[derive(Debug, Clone)]
76 : pub struct ServerState {
77 : /// Used to close old connections.
78 : ///
79 : /// When a new connection is made, we send a message signalling to the old
80 : /// connection to close.
81 : pub sender: broadcast::Sender<()>,
82 :
83 : /// Used to cancel all spawned threads in the monitor.
84 : pub token: CancellationToken,
85 :
86 : // The CLI args
87 : pub args: &'static Args,
88 : }
89 :
90 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
91 : ///
92 : /// This is mainly meant to be called with futures that will be pending for a very
93 : /// long time, or are not mean to return. If it is not desirable for the future to
94 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
95 : /// be logged with `f`.
96 0 : pub fn spawn_with_cancel<T, F>(
97 0 : token: CancellationToken,
98 0 : f: F,
99 0 : future: T,
100 0 : ) -> JoinHandle<Option<T::Output>>
101 0 : where
102 0 : T: Future + Send + 'static,
103 0 : T::Output: Send + 'static,
104 0 : F: FnOnce(&T::Output) + Send + 'static,
105 0 : {
106 0 : tokio::spawn(async move {
107 0 : tokio::select! {
108 0 : _ = token.cancelled() => {
109 0 : info!("received global kill signal");
110 0 : None
111 : }
112 0 : res = future => {
113 0 : f(&res);
114 0 : Some(res)
115 : }
116 : }
117 0 : })
118 0 : }
119 :
120 : /// The entrypoint to the binary.
121 : ///
122 : /// Set up tracing, parse arguments, and start an http server.
123 0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
124 0 : // This channel is used to close old connections. When a new connection is
125 0 : // made, we send a message signalling to the old connection to close.
126 0 : let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
127 0 :
128 0 : let app = Router::new()
129 0 : // This route gets upgraded to a websocket connection. We only support
130 0 : // one connection at a time, which we enforce by killing old connections
131 0 : // when we receive a new one.
132 0 : .route("/monitor", get(ws_handler))
133 0 : .with_state(ServerState {
134 0 : sender,
135 0 : token,
136 0 : args,
137 0 : });
138 0 :
139 0 : let addr_str = args.addr();
140 0 : let addr: SocketAddr = addr_str.parse().expect("parsing address should not fail");
141 :
142 0 : let listener = TcpListener::bind(&addr)
143 0 : .await
144 0 : .with_context(|| format!("failed to bind to {addr}"))?;
145 0 : info!(addr_str, "server bound");
146 0 : axum::serve(listener, app.into_make_service())
147 0 : .await
148 0 : .context("server exited")?;
149 :
150 0 : Ok(())
151 0 : }
152 :
153 : /// Handles incoming websocket connections.
154 : ///
155 : /// If we are already to connected to an agent, we kill that old connection
156 : /// and accept the new one.
157 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
158 : pub async fn ws_handler(
159 : ws: WebSocketUpgrade,
160 : State(ServerState {
161 : sender,
162 : token,
163 : args,
164 : }): State<ServerState>,
165 : ) -> Response {
166 : // Kill the old monitor
167 : info!("closing old connection if there is one");
168 : let _ = sender.send(());
169 :
170 : // Start the new one. Wow, the cycle of death and rebirth
171 : let closer = sender.subscribe();
172 0 : ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
173 : }
174 :
175 : /// Starts the monitor. If startup fails or the monitor exits, an error will
176 : /// be logged and our internal state will be reset to allow for new connections.
177 : #[tracing::instrument(skip_all)]
178 : async fn start_monitor(
179 : ws: WebSocket,
180 : args: &Args,
181 : kill: broadcast::Receiver<()>,
182 : token: CancellationToken,
183 : ) {
184 : info!(
185 : ?args,
186 : "accepted new websocket connection -> starting monitor"
187 : );
188 : let timeout = Duration::from_secs(4);
189 : let monitor = tokio::time::timeout(
190 : timeout,
191 : Runner::new(Default::default(), args, ws, kill, token),
192 : )
193 : .await;
194 : let mut monitor = match monitor {
195 : Ok(Ok(monitor)) => monitor,
196 : Ok(Err(e)) => {
197 : error!(error = format_args!("{e:#}"), "failed to create monitor");
198 : return;
199 : }
200 : Err(_) => {
201 : error!(?timeout, "creating monitor timed out");
202 : return;
203 : }
204 : };
205 : info!("connected to agent");
206 :
207 : match monitor.run().await {
208 : Ok(()) => info!("monitor was killed due to new connection"),
209 : Err(e) => error!(
210 : error = format_args!("{e:#}"),
211 : "monitor terminated unexpectedly"
212 : ),
213 : }
214 : }
|