Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : #![cfg(target_os = "linux")]
4 :
5 : use anyhow::Context;
6 : use axum::{
7 : extract::{ws::WebSocket, State, WebSocketUpgrade},
8 : response::Response,
9 : };
10 : use axum::{routing::get, Router};
11 : use clap::Parser;
12 : use futures::Future;
13 : use std::net::SocketAddr;
14 : use std::{fmt::Debug, time::Duration};
15 : use sysinfo::{RefreshKind, System, SystemExt};
16 : use tokio::net::TcpListener;
17 : use tokio::{sync::broadcast, task::JoinHandle};
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::{error, info};
20 :
21 : use runner::Runner;
22 :
23 : // Code that interfaces with agent
24 : pub mod dispatcher;
25 : pub mod protocol;
26 :
27 : pub mod cgroup;
28 : pub mod filecache;
29 : pub mod runner;
30 :
31 : /// The vm-monitor is an autoscaling component started by compute_ctl.
32 : ///
33 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
34 : /// memory pressure by making requests to the autoscaler-agent.
35 0 : #[derive(Debug, Parser)]
36 : pub struct Args {
37 : /// The name of the cgroup we should monitor for memory.high events. This
38 : /// is the cgroup that postgres should be running in.
39 : #[arg(short, long)]
40 : pub cgroup: Option<String>,
41 :
42 : /// The connection string for the Postgres file cache we should manage.
43 : #[arg(short, long)]
44 : pub pgconnstr: Option<String>,
45 :
46 : /// The address we should listen on for connection requests. For the
47 : /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
48 : #[arg(short, long)]
49 0 : pub addr: String,
50 : }
51 :
52 : impl Args {
53 0 : pub fn addr(&self) -> &str {
54 0 : &self.addr
55 0 : }
56 : }
57 :
58 : /// The number of bytes in one mebibyte.
59 : #[allow(non_upper_case_globals)]
60 : const MiB: u64 = 1 << 20;
61 :
62 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
63 : /// purposes. (Most calculations in this crate use bytes directly)
64 0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
65 0 : (bytes as f32) / (MiB as f32)
66 0 : }
67 :
68 0 : pub fn get_total_system_memory() -> u64 {
69 0 : System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
70 0 : }
71 :
72 : /// Global app state for the Axum server
73 : #[derive(Debug, Clone)]
74 : pub struct ServerState {
75 : /// Used to close old connections.
76 : ///
77 : /// When a new connection is made, we send a message signalling to the old
78 : /// connection to close.
79 : pub sender: broadcast::Sender<()>,
80 :
81 : /// Used to cancel all spawned threads in the monitor.
82 : pub token: CancellationToken,
83 :
84 : // The CLI args
85 : pub args: &'static Args,
86 : }
87 :
88 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
89 : ///
90 : /// This is mainly meant to be called with futures that will be pending for a very
91 : /// long time, or are not mean to return. If it is not desirable for the future to
92 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
93 : /// be logged with `f`.
94 0 : pub fn spawn_with_cancel<T, F>(
95 0 : token: CancellationToken,
96 0 : f: F,
97 0 : future: T,
98 0 : ) -> JoinHandle<Option<T::Output>>
99 0 : where
100 0 : T: Future + Send + 'static,
101 0 : T::Output: Send + 'static,
102 0 : F: FnOnce(&T::Output) + Send + 'static,
103 0 : {
104 0 : tokio::spawn(async move {
105 0 : tokio::select! {
106 0 : _ = token.cancelled() => {
107 0 : info!("received global kill signal");
108 0 : None
109 : }
110 0 : res = future => {
111 0 : f(&res);
112 0 : Some(res)
113 : }
114 : }
115 0 : })
116 0 : }
117 :
118 : /// The entrypoint to the binary.
119 : ///
120 : /// Set up tracing, parse arguments, and start an http server.
121 0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
122 0 : // This channel is used to close old connections. When a new connection is
123 0 : // made, we send a message signalling to the old connection to close.
124 0 : let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
125 0 :
126 0 : let app = Router::new()
127 0 : // This route gets upgraded to a websocket connection. We only support
128 0 : // one connection at a time, which we enforce by killing old connections
129 0 : // when we receive a new one.
130 0 : .route("/monitor", get(ws_handler))
131 0 : .with_state(ServerState {
132 0 : sender,
133 0 : token,
134 0 : args,
135 0 : });
136 0 :
137 0 : let addr_str = args.addr();
138 0 : let addr: SocketAddr = addr_str.parse().expect("parsing address should not fail");
139 :
140 0 : let listener = TcpListener::bind(&addr)
141 0 : .await
142 0 : .with_context(|| format!("failed to bind to {addr}"))?;
143 0 : info!(addr_str, "server bound");
144 0 : axum::serve(listener, app.into_make_service())
145 0 : .await
146 0 : .context("server exited")?;
147 :
148 0 : Ok(())
149 0 : }
150 :
151 : /// Handles incoming websocket connections.
152 : ///
153 : /// If we are already to connected to an agent, we kill that old connection
154 : /// and accept the new one.
155 0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
156 : pub async fn ws_handler(
157 : ws: WebSocketUpgrade,
158 : State(ServerState {
159 : sender,
160 : token,
161 : args,
162 : }): State<ServerState>,
163 : ) -> Response {
164 : // Kill the old monitor
165 : info!("closing old connection if there is one");
166 : let _ = sender.send(());
167 :
168 : // Start the new one. Wow, the cycle of death and rebirth
169 : let closer = sender.subscribe();
170 0 : ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
171 : }
172 :
173 : /// Starts the monitor. If startup fails or the monitor exits, an error will
174 : /// be logged and our internal state will be reset to allow for new connections.
175 0 : #[tracing::instrument(skip_all)]
176 : async fn start_monitor(
177 : ws: WebSocket,
178 : args: &Args,
179 : kill: broadcast::Receiver<()>,
180 : token: CancellationToken,
181 : ) {
182 : info!(
183 : ?args,
184 : "accepted new websocket connection -> starting monitor"
185 : );
186 : let timeout = Duration::from_secs(4);
187 : let monitor = tokio::time::timeout(
188 : timeout,
189 : Runner::new(Default::default(), args, ws, kill, token),
190 : )
191 : .await;
192 : let mut monitor = match monitor {
193 : Ok(Ok(monitor)) => monitor,
194 : Ok(Err(error)) => {
195 : error!(?error, "failed to create monitor");
196 : return;
197 : }
198 : Err(_) => {
199 : error!(
200 : ?timeout,
201 : "creating monitor timed out (probably waiting to receive protocol range)"
202 : );
203 : return;
204 : }
205 : };
206 : info!("connected to agent");
207 :
208 : match monitor.run().await {
209 : Ok(()) => info!("monitor was killed due to new connection"),
210 : Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
211 : }
212 : }
|