TLA Line data Source code
1 : #![cfg(target_os = "linux")]
2 :
3 : use anyhow::Context;
4 : use axum::{
5 : extract::{ws::WebSocket, State, WebSocketUpgrade},
6 : response::Response,
7 : };
8 : use axum::{routing::get, Router, Server};
9 : use clap::Parser;
10 : use futures::Future;
11 : use std::{fmt::Debug, time::Duration};
12 : use sysinfo::{RefreshKind, System, SystemExt};
13 : use tokio::{sync::broadcast, task::JoinHandle};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::{error, info};
16 :
17 : use runner::Runner;
18 :
19 : // Code that interfaces with agent
20 : pub mod dispatcher;
21 : pub mod protocol;
22 :
23 : pub mod cgroup;
24 : pub mod filecache;
25 : pub mod runner;
26 :
27 : /// The vm-monitor is an autoscaling component started by compute_ctl.
28 : ///
29 : /// It carries out autoscaling decisions (upscaling/downscaling) and responds to
30 : /// memory pressure by making requests to the autoscaler-agent.
31 UBC 0 : #[derive(Debug, Parser)]
32 : pub struct Args {
33 : /// The name of the cgroup we should monitor for memory.high events. This
34 : /// is the cgroup that postgres should be running in.
35 : #[arg(short, long)]
36 : pub cgroup: Option<String>,
37 :
38 : /// The connection string for the Postgres file cache we should manage.
39 : #[arg(short, long)]
40 : pub pgconnstr: Option<String>,
41 :
42 : /// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
43 : /// kernel's page cache), and therefore should not count against available memory.
44 : //
45 : // NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
46 : // than a roundabout way, via whether it's on disk), but in order to be backwards compatible
47 : // during the switch away from an in-memory file cache, we had to default to the previous
48 : // behavior.
49 : #[arg(long)]
50 0 : pub file_cache_on_disk: bool,
51 0 :
52 0 : /// The address we should listen on for connection requests. For the
53 : /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
54 : #[arg(short, long)]
55 0 : pub addr: String,
56 : }
57 :
58 : impl Args {
59 0 : pub fn addr(&self) -> &str {
60 0 : &self.addr
61 0 : }
62 : }
63 :
64 : /// The number of bytes in one mebibyte.
65 : #[allow(non_upper_case_globals)]
66 : const MiB: u64 = 1 << 20;
67 :
68 : /// Convert a quantity in bytes to a quantity in mebibytes, generally for display
69 : /// purposes. (Most calculations in this crate use bytes directly)
70 0 : pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
71 0 : (bytes as f32) / (MiB as f32)
72 0 : }
73 :
74 0 : pub fn get_total_system_memory() -> u64 {
75 0 : System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
76 0 : }
77 :
78 : /// Global app state for the Axum server
79 0 : #[derive(Debug, Clone)]
80 : pub struct ServerState {
81 : /// Used to close old connections.
82 : ///
83 : /// When a new connection is made, we send a message signalling to the old
84 : /// connection to close.
85 : pub sender: broadcast::Sender<()>,
86 :
87 : /// Used to cancel all spawned threads in the monitor.
88 : pub token: CancellationToken,
89 :
90 : // The CLI args
91 : pub args: &'static Args,
92 : }
93 :
94 : /// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
95 : ///
96 : /// This is mainly meant to be called with futures that will be pending for a very
97 : /// long time, or are not mean to return. If it is not desirable for the future to
98 : /// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
99 : /// be logged with `f`.
100 0 : pub fn spawn_with_cancel<T, F>(
101 0 : token: CancellationToken,
102 0 : f: F,
103 0 : future: T,
104 0 : ) -> JoinHandle<Option<T::Output>>
105 0 : where
106 0 : T: Future + Send + 'static,
107 0 : T::Output: Send + 'static,
108 0 : F: FnOnce(&T::Output) + Send + 'static,
109 0 : {
110 0 : tokio::spawn(async move {
111 0 : tokio::select! {
112 : _ = token.cancelled() => {
113 0 : info!("received global kill signal");
114 : None
115 : }
116 0 : res = future => {
117 : f(&res);
118 : Some(res)
119 : }
120 : }
121 0 : })
122 0 : }
123 :
124 : /// The entrypoint to the binary.
125 : ///
126 : /// Set up tracing, parse arguments, and start an http server.
127 0 : pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
128 0 : // This channel is used to close old connections. When a new connection is
129 0 : // made, we send a message signalling to the old connection to close.
130 0 : let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
131 0 :
132 0 : let app = Router::new()
133 0 : // This route gets upgraded to a websocket connection. We only support
134 0 : // one connection at a time, which we enforce by killing old connections
135 0 : // when we receive a new one.
136 0 : .route("/monitor", get(ws_handler))
137 0 : .with_state(ServerState {
138 0 : sender,
139 0 : token,
140 0 : args,
141 0 : });
142 0 :
143 0 : let addr = args.addr();
144 0 : let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
145 0 : .with_context(|| format!("failed to bind to {addr}"))?;
146 :
147 0 : info!(addr, "server bound");
148 :
149 0 : bound
150 0 : .serve(app.into_make_service())
151 0 : .await
152 0 : .context("server exited")?;
153 :
154 0 : Ok(())
155 0 : }
156 :
157 : /// Handles incoming websocket connections.
158 : ///
159 : /// If we are already to connected to an agent, we kill that old connection
160 : /// and accept the new one.
161 0 : #[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
162 : pub async fn ws_handler(
163 : ws: WebSocketUpgrade,
164 : State(ServerState {
165 : sender,
166 : token,
167 : args,
168 : }): State<ServerState>,
169 : ) -> Response {
170 : // Kill the old monitor
171 0 : info!("closing old connection if there is one");
172 : let _ = sender.send(());
173 :
174 : // Start the new one. Wow, the cycle of death and rebirth
175 : let closer = sender.subscribe();
176 0 : ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
177 : }
178 :
179 : /// Starts the monitor. If startup fails or the monitor exits, an error will
180 : /// be logged and our internal state will be reset to allow for new connections.
181 0 : #[tracing::instrument(skip_all)]
182 : async fn start_monitor(
183 : ws: WebSocket,
184 : args: &Args,
185 : kill: broadcast::Receiver<()>,
186 : token: CancellationToken,
187 : ) {
188 0 : info!(
189 0 : ?args,
190 0 : "accepted new websocket connection -> starting monitor"
191 0 : );
192 : let timeout = Duration::from_secs(4);
193 : let monitor = tokio::time::timeout(
194 : timeout,
195 : Runner::new(Default::default(), args, ws, kill, token),
196 : )
197 : .await;
198 : let mut monitor = match monitor {
199 : Ok(Ok(monitor)) => monitor,
200 : Ok(Err(error)) => {
201 0 : error!(?error, "failed to create monitor");
202 : return;
203 : }
204 : Err(_) => {
205 0 : error!(
206 0 : ?timeout,
207 0 : "creating monitor timed out (probably waiting to receive protocol range)"
208 0 : );
209 : return;
210 : }
211 : };
212 0 : info!("connected to agent");
213 :
214 : match monitor.run().await {
215 0 : Ok(()) => info!("monitor was killed due to new connection"),
216 0 : Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
217 : }
218 : }
|