Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{Level, error, info, instrument, span};
10 :
11 : use crate::compute::ComputeNode;
12 : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
13 :
14 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
15 :
16 : struct ComputeMonitor {
17 : compute: Arc<ComputeNode>,
18 :
19 : /// The moment when Postgres had some activity,
20 : /// that should prevent compute from being suspended.
21 : last_active: Option<DateTime<Utc>>,
22 :
23 : /// The moment when we last tried to check Postgres.
24 : last_checked: DateTime<Utc>,
25 : /// The last moment we did a successful Postgres check.
26 : last_up: DateTime<Utc>,
27 :
28 : /// Only used for internal statistics change tracking
29 : /// between monitor runs and can be outdated.
30 : active_time: Option<f64>,
31 : /// Only used for internal statistics change tracking
32 : /// between monitor runs and can be outdated.
33 : sessions: Option<i64>,
34 :
35 : /// Use experimental statistics-based activity monitor. It's no longer
36 : /// 'experimental' per se, as it's enabled for everyone, but we still
37 : /// keep the flag as an option to turn it off in some cases if it will
38 : /// misbehave.
39 : experimental: bool,
40 : }
41 :
42 : impl ComputeMonitor {
43 0 : fn report_down(&self) {
44 0 : let now = Utc::now();
45 0 :
46 0 : // Calculate and report current downtime
47 0 : // (since the last time Postgres was up)
48 0 : let downtime = now.signed_duration_since(self.last_up);
49 0 : PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
50 0 :
51 0 : // Calculate and update total downtime
52 0 : // (cumulative duration of Postgres downtime in ms)
53 0 : let inc = now
54 0 : .signed_duration_since(self.last_checked)
55 0 : .num_milliseconds();
56 0 : PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
57 0 : }
58 :
59 0 : fn report_up(&mut self) {
60 0 : self.last_up = Utc::now();
61 0 : PG_CURR_DOWNTIME_MS.set(0.0);
62 0 : }
63 :
64 0 : fn downtime_info(&self) -> String {
65 0 : format!(
66 0 : "total_ms: {}, current_ms: {}, last_up: {}",
67 0 : PG_TOTAL_DOWNTIME_MS.get(),
68 0 : PG_CURR_DOWNTIME_MS.get(),
69 0 : self.last_up
70 0 : )
71 0 : }
72 :
73 : /// Spin in a loop and figure out the last activity time in the Postgres.
74 : /// Then update it in the shared state. This function never errors out.
75 : /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
76 : /// should be handled gracefully.
77 : #[instrument(skip_all)]
78 : pub fn run(&mut self) {
79 : // Suppose that `connstr` doesn't change
80 : let connstr = self.compute.params.connstr.clone();
81 : let conf = self
82 : .compute
83 : .get_conn_conf(Some("compute_ctl:compute_monitor"));
84 :
85 : // During startup and configuration we connect to every Postgres database,
86 : // but we don't want to count this as some user activity. So wait until
87 : // the compute fully started before monitoring activity.
88 : wait_for_postgres_start(&self.compute);
89 :
90 : // Define `client` outside of the loop to reuse existing connection if it's active.
91 : let mut client = conf.connect(NoTls);
92 :
93 : info!("starting compute monitor for {}", connstr);
94 :
95 : loop {
96 : match &mut client {
97 : Ok(cli) => {
98 : if cli.is_closed() {
99 : info!(
100 : downtime_info = self.downtime_info(),
101 : "connection to Postgres is closed, trying to reconnect"
102 : );
103 : self.report_down();
104 :
105 : // Connection is closed, reconnect and try again.
106 : client = conf.connect(NoTls);
107 : } else {
108 : match self.check(cli) {
109 : Ok(_) => {
110 : self.report_up();
111 : self.compute.update_last_active(self.last_active);
112 : }
113 : Err(e) => {
114 : // Although we have many places where we can return errors in `check()`,
115 : // normally it shouldn't happen. I.e., we will likely return error if
116 : // connection got broken, query timed out, Postgres returned invalid data, etc.
117 : // In all such cases it's suspicious, so let's report this as downtime.
118 : self.report_down();
119 : error!(
120 : downtime_info = self.downtime_info(),
121 : "could not check Postgres: {}", e
122 : );
123 :
124 : // Reconnect to Postgres just in case. During tests, I noticed
125 : // that queries in `check()` can fail with `connection closed`,
126 : // but `cli.is_closed()` above doesn't detect it. Even if old
127 : // connection is still alive, it will be dropped when we reassign
128 : // `client` to a new connection.
129 : client = conf.connect(NoTls);
130 : }
131 : }
132 : }
133 : }
134 : Err(e) => {
135 : info!(
136 : downtime_info = self.downtime_info(),
137 : "could not connect to Postgres: {}, retrying", e
138 : );
139 : self.report_down();
140 :
141 : // Establish a new connection and try again.
142 : client = conf.connect(NoTls);
143 : }
144 : }
145 :
146 : // Reset the `last_checked` timestamp and sleep before the next iteration.
147 : self.last_checked = Utc::now();
148 : thread::sleep(MONITOR_CHECK_INTERVAL);
149 : }
150 : }
151 :
152 : #[instrument(skip_all)]
153 : fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
154 : // This is new logic, only enable if the feature flag is set.
155 : // TODO: remove this once we are sure that it works OR drop it altogether.
156 : if self.experimental {
157 : // Check if the total active time or sessions across all databases has changed.
158 : // If it did, it means that user executed some queries. In theory, it can even go down if
159 : // some databases were dropped, but it's still user activity.
160 : match get_database_stats(cli) {
161 : Ok((active_time, sessions)) => {
162 : let mut detected_activity = false;
163 :
164 : if let Some(prev_active_time) = self.active_time {
165 : if active_time != prev_active_time {
166 : detected_activity = true;
167 : }
168 : }
169 : self.active_time = Some(active_time);
170 :
171 : if let Some(prev_sessions) = self.sessions {
172 : if sessions != prev_sessions {
173 : detected_activity = true;
174 : }
175 : }
176 : self.sessions = Some(sessions);
177 :
178 : if detected_activity {
179 : // Update the last active time and continue, we don't need to
180 : // check backends state change.
181 : self.last_active = Some(Utc::now());
182 : return Ok(());
183 : }
184 : }
185 : Err(e) => {
186 : return Err(anyhow::anyhow!("could not get database statistics: {}", e));
187 : }
188 : }
189 : }
190 :
191 : // If database statistics are the same, check all backends for state changes.
192 : // Maybe there are some with more recent activity. `get_backends_state_change()`
193 : // can return None or stale timestamp, so it's `compute.update_last_active()`
194 : // responsibility to check if the new timestamp is more recent than the current one.
195 : // This helps us to discover new sessions that have not done anything yet.
196 : match get_backends_state_change(cli) {
197 : Ok(last_active) => match (last_active, self.last_active) {
198 : (Some(last_active), Some(prev_last_active)) => {
199 : if last_active > prev_last_active {
200 : self.last_active = Some(last_active);
201 : return Ok(());
202 : }
203 : }
204 : (Some(last_active), None) => {
205 : self.last_active = Some(last_active);
206 : return Ok(());
207 : }
208 : _ => {}
209 : },
210 : Err(e) => {
211 : return Err(anyhow::anyhow!(
212 : "could not get backends state change: {}",
213 : e
214 : ));
215 : }
216 : }
217 :
218 : // If there are existing (logical) walsenders, do not suspend.
219 : //
220 : // N.B. walproposer doesn't currently show up in pg_stat_replication,
221 : // but protect if it will.
222 : const WS_COUNT_QUERY: &str =
223 : "select count(*) from pg_stat_replication where application_name != 'walproposer';";
224 : match cli.query_one(WS_COUNT_QUERY, &[]) {
225 : Ok(r) => match r.try_get::<&str, i64>("count") {
226 : Ok(num_ws) => {
227 : if num_ws > 0 {
228 : self.last_active = Some(Utc::now());
229 : return Ok(());
230 : }
231 : }
232 : Err(e) => {
233 : let err: anyhow::Error = e.into();
234 : return Err(err.context("failed to parse walsenders count"));
235 : }
236 : },
237 : Err(e) => {
238 : return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
239 : }
240 : }
241 :
242 : // Don't suspend compute if there is an active logical replication subscription
243 : //
244 : // `where pid is not null` – to filter out read only computes and subscription on branches
245 : const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
246 : "select count(*) from pg_stat_subscription where pid is not null;";
247 : match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
248 : Ok(row) => match row.try_get::<&str, i64>("count") {
249 : Ok(num_subscribers) => {
250 : if num_subscribers > 0 {
251 : self.last_active = Some(Utc::now());
252 : return Ok(());
253 : }
254 : }
255 : Err(e) => {
256 : return Err(anyhow::anyhow!(
257 : "failed to parse 'pg_stat_subscription' count: {}",
258 : e
259 : ));
260 : }
261 : },
262 : Err(e) => {
263 : return Err(anyhow::anyhow!(
264 : "failed to get list of active logical replication subscriptions: {}",
265 : e
266 : ));
267 : }
268 : }
269 :
270 : // Do not suspend compute if autovacuum is running
271 : const AUTOVACUUM_COUNT_QUERY: &str =
272 : "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
273 : match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
274 : Ok(r) => match r.try_get::<&str, i64>("count") {
275 : Ok(num_workers) => {
276 : if num_workers > 0 {
277 : self.last_active = Some(Utc::now());
278 : return Ok(());
279 : };
280 : }
281 : Err(e) => {
282 : return Err(anyhow::anyhow!(
283 : "failed to parse autovacuum workers count: {}",
284 : e
285 : ));
286 : }
287 : },
288 : Err(e) => {
289 : return Err(anyhow::anyhow!(
290 : "failed to get list of autovacuum workers: {}",
291 : e
292 : ));
293 : }
294 : }
295 :
296 : Ok(())
297 : }
298 : }
299 :
300 : // Hang on condition variable waiting until the compute status is `Running`.
301 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
302 0 : let mut state = compute.state.lock().unwrap();
303 0 : while state.status != ComputeStatus::Running {
304 0 : info!("compute is not running, waiting before monitoring activity");
305 0 : state = compute.state_changed.wait(state).unwrap();
306 0 :
307 0 : if state.status == ComputeStatus::Running {
308 0 : break;
309 0 : }
310 : }
311 0 : }
312 :
313 : // Figure out the total active time and sessions across all non-system databases.
314 : // Returned tuple is `(active_time, sessions)`.
315 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
316 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
317 : // (or open any sessions) yet.
318 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
319 0 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
320 0 : // like `postgres_exporter` use it to query Postgres statistics.
321 0 : // Use explicit 8 bytes type casts to match Rust types.
322 0 : let stats = cli.query_one(
323 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
324 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
325 0 : FROM pg_stat_database
326 0 : WHERE datname NOT IN (
327 0 : 'postgres',
328 0 : 'template0',
329 0 : 'template1'
330 0 : );",
331 0 : &[],
332 0 : );
333 0 : let stats = match stats {
334 0 : Ok(stats) => stats,
335 0 : Err(e) => {
336 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
337 : }
338 : };
339 :
340 0 : let active_time: f64 = match stats.try_get("total_active_time") {
341 0 : Ok(active_time) => active_time,
342 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
343 : };
344 :
345 0 : let sessions: i64 = match stats.try_get("total_sessions") {
346 0 : Ok(sessions) => sessions,
347 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
348 : };
349 :
350 0 : Ok((active_time, sessions))
351 0 : }
352 :
353 : // Figure out the most recent state change time across all client backends.
354 : // If there is currently active backend, timestamp will be `Utc::now()`.
355 : // It can return `None`, which means no client backends exist or we were
356 : // unable to parse the timestamp.
357 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
358 0 : let mut last_active: Option<DateTime<Utc>> = None;
359 0 : // Get all running client backends except ourself, use RFC3339 DateTime format.
360 0 : let backends = cli.query(
361 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
362 0 : FROM pg_stat_activity
363 0 : WHERE backend_type = 'client backend'
364 0 : AND pid != pg_backend_pid()
365 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
366 0 : &[],
367 0 : );
368 0 :
369 0 : match backends {
370 0 : Ok(backs) => {
371 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
372 :
373 0 : for b in backs.into_iter() {
374 0 : let state: String = match b.try_get("state") {
375 0 : Ok(state) => state,
376 0 : Err(_) => continue,
377 : };
378 :
379 0 : if state == "idle" {
380 0 : let change: String = match b.try_get("state_change") {
381 0 : Ok(state_change) => state_change,
382 0 : Err(_) => continue,
383 : };
384 0 : let change = DateTime::parse_from_rfc3339(&change);
385 0 : match change {
386 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
387 0 : Err(e) => {
388 0 : info!("cannot parse backend state_change DateTime: {}", e);
389 0 : continue;
390 : }
391 : }
392 : } else {
393 : // Found non-idle backend, so the last activity is NOW.
394 : // Return immediately, no need to check other backends.
395 0 : return Ok(Some(Utc::now()));
396 : }
397 : }
398 :
399 : // Get idle backend `state_change` with the max timestamp.
400 0 : if let Some(last) = idle_backs.iter().max() {
401 0 : last_active = Some(*last);
402 0 : }
403 : }
404 0 : Err(e) => {
405 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
406 : }
407 : }
408 :
409 0 : Ok(last_active)
410 0 : }
411 :
412 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
413 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
414 0 : let compute = Arc::clone(compute);
415 0 : let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
416 0 : let now = Utc::now();
417 0 : let mut monitor = ComputeMonitor {
418 0 : compute,
419 0 : last_active: None,
420 0 : last_checked: now,
421 0 : last_up: now,
422 0 : active_time: None,
423 0 : sessions: None,
424 0 : experimental,
425 0 : };
426 0 :
427 0 : thread::Builder::new()
428 0 : .name("compute-monitor".into())
429 0 : .spawn(move || {
430 0 : let span = span!(Level::INFO, "compute_monitor");
431 0 : let _enter = span.enter();
432 0 : monitor.run();
433 0 : })
434 0 : .expect("cannot launch compute monitor thread")
435 0 : }
|