Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{Level, error, info, instrument, span};
10 :
11 : use crate::compute::ComputeNode;
12 : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
13 :
14 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
15 :
16 : /// Struct to store runtime state of the compute monitor thread.
17 : /// In theory, this could be a part of `Compute`, but i)
18 : /// this state is expected to be accessed only by single thread,
19 : /// so we don't need to care about locking; ii) `Compute` is
20 : /// already quite big. Thus, it seems to be a good idea to keep
21 : /// all the activity/health monitoring parts here.
22 : struct ComputeMonitor {
23 : compute: Arc<ComputeNode>,
24 :
25 : /// The moment when Postgres had some activity,
26 : /// that should prevent compute from being suspended.
27 : last_active: Option<DateTime<Utc>>,
28 :
29 : /// The moment when we last tried to check Postgres.
30 : last_checked: DateTime<Utc>,
31 : /// The last moment we did a successful Postgres check.
32 : last_up: DateTime<Utc>,
33 :
34 : /// Only used for internal statistics change tracking
35 : /// between monitor runs and can be outdated.
36 : active_time: Option<f64>,
37 : /// Only used for internal statistics change tracking
38 : /// between monitor runs and can be outdated.
39 : sessions: Option<i64>,
40 :
41 : /// Use experimental statistics-based activity monitor. It's no longer
42 : /// 'experimental' per se, as it's enabled for everyone, but we still
43 : /// keep the flag as an option to turn it off in some cases if it will
44 : /// misbehave.
45 : experimental: bool,
46 : }
47 :
48 : impl ComputeMonitor {
49 0 : fn report_down(&self) {
50 0 : let now = Utc::now();
51 0 :
52 0 : // Calculate and report current downtime
53 0 : // (since the last time Postgres was up)
54 0 : let downtime = now.signed_duration_since(self.last_up);
55 0 : PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
56 0 :
57 0 : // Calculate and update total downtime
58 0 : // (cumulative duration of Postgres downtime in ms)
59 0 : let inc = now
60 0 : .signed_duration_since(self.last_checked)
61 0 : .num_milliseconds();
62 0 : PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
63 0 : }
64 :
65 0 : fn report_up(&mut self) {
66 0 : self.last_up = Utc::now();
67 0 : PG_CURR_DOWNTIME_MS.set(0.0);
68 0 : }
69 :
70 0 : fn downtime_info(&self) -> String {
71 0 : format!(
72 0 : "total_ms: {}, current_ms: {}, last_up: {}",
73 0 : PG_TOTAL_DOWNTIME_MS.get(),
74 0 : PG_CURR_DOWNTIME_MS.get(),
75 0 : self.last_up
76 0 : )
77 0 : }
78 :
79 : /// Check if compute is in some terminal or soon-to-be-terminal
80 : /// state, then return `true`, signalling the caller that it
81 : /// should exit gracefully. Otherwise, return `false`.
82 0 : fn check_interrupts(&mut self) -> bool {
83 0 : let compute_status = self.compute.get_status();
84 0 : if matches!(
85 0 : compute_status,
86 : ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
87 : ) {
88 0 : info!(
89 0 : "compute is in {} status, stopping compute monitor",
90 : compute_status
91 : );
92 0 : return true;
93 0 : }
94 0 :
95 0 : false
96 0 : }
97 :
98 : /// Spin in a loop and figure out the last activity time in the Postgres.
99 : /// Then update it in the shared state. This function currently never
100 : /// errors out explicitly, but there is a graceful termination path.
101 : /// Every time we receive an error trying to check Postgres, we use
102 : /// [`ComputeMonitor::check_interrupts()`] because it could be that
103 : /// compute is being terminated already, then we can exit gracefully
104 : /// to not produce errors' noise in the log.
105 : /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
106 : /// should be handled gracefully.
107 : #[instrument(skip_all)]
108 : pub fn run(&mut self) -> anyhow::Result<()> {
109 : // Suppose that `connstr` doesn't change
110 : let connstr = self.compute.params.connstr.clone();
111 : let conf = self
112 : .compute
113 : .get_conn_conf(Some("compute_ctl:compute_monitor"));
114 :
115 : // During startup and configuration we connect to every Postgres database,
116 : // but we don't want to count this as some user activity. So wait until
117 : // the compute fully started before monitoring activity.
118 : wait_for_postgres_start(&self.compute);
119 :
120 : // Define `client` outside of the loop to reuse existing connection if it's active.
121 : let mut client = conf.connect(NoTls);
122 :
123 : info!("starting compute monitor for {}", connstr);
124 :
125 : loop {
126 : if self.check_interrupts() {
127 : break;
128 : }
129 :
130 : match &mut client {
131 : Ok(cli) => {
132 : if cli.is_closed() {
133 : info!(
134 : downtime_info = self.downtime_info(),
135 : "connection to Postgres is closed, trying to reconnect"
136 : );
137 : if self.check_interrupts() {
138 : break;
139 : }
140 :
141 : self.report_down();
142 :
143 : // Connection is closed, reconnect and try again.
144 : client = conf.connect(NoTls);
145 : } else {
146 : match self.check(cli) {
147 : Ok(_) => {
148 : self.report_up();
149 : self.compute.update_last_active(self.last_active);
150 : }
151 : Err(e) => {
152 : error!(
153 : downtime_info = self.downtime_info(),
154 : "could not check Postgres: {}", e
155 : );
156 : if self.check_interrupts() {
157 : break;
158 : }
159 :
160 : // Although we have many places where we can return errors in `check()`,
161 : // normally it shouldn't happen. I.e., we will likely return error if
162 : // connection got broken, query timed out, Postgres returned invalid data, etc.
163 : // In all such cases it's suspicious, so let's report this as downtime.
164 : self.report_down();
165 :
166 : // Reconnect to Postgres just in case. During tests, I noticed
167 : // that queries in `check()` can fail with `connection closed`,
168 : // but `cli.is_closed()` above doesn't detect it. Even if old
169 : // connection is still alive, it will be dropped when we reassign
170 : // `client` to a new connection.
171 : client = conf.connect(NoTls);
172 : }
173 : }
174 : }
175 : }
176 : Err(e) => {
177 : info!(
178 : downtime_info = self.downtime_info(),
179 : "could not connect to Postgres: {}, retrying", e
180 : );
181 : if self.check_interrupts() {
182 : break;
183 : }
184 :
185 : self.report_down();
186 :
187 : // Establish a new connection and try again.
188 : client = conf.connect(NoTls);
189 : }
190 : }
191 :
192 : // Reset the `last_checked` timestamp and sleep before the next iteration.
193 : self.last_checked = Utc::now();
194 : thread::sleep(MONITOR_CHECK_INTERVAL);
195 : }
196 :
197 : // Graceful termination path
198 : Ok(())
199 : }
200 :
201 : #[instrument(skip_all)]
202 : fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
203 : // This is new logic, only enable if the feature flag is set.
204 : // TODO: remove this once we are sure that it works OR drop it altogether.
205 : if self.experimental {
206 : // Check if the total active time or sessions across all databases has changed.
207 : // If it did, it means that user executed some queries. In theory, it can even go down if
208 : // some databases were dropped, but it's still user activity.
209 : match get_database_stats(cli) {
210 : Ok((active_time, sessions)) => {
211 : let mut detected_activity = false;
212 :
213 : if let Some(prev_active_time) = self.active_time {
214 : if active_time != prev_active_time {
215 : detected_activity = true;
216 : }
217 : }
218 : self.active_time = Some(active_time);
219 :
220 : if let Some(prev_sessions) = self.sessions {
221 : if sessions != prev_sessions {
222 : detected_activity = true;
223 : }
224 : }
225 : self.sessions = Some(sessions);
226 :
227 : if detected_activity {
228 : // Update the last active time and continue, we don't need to
229 : // check backends state change.
230 : self.last_active = Some(Utc::now());
231 : return Ok(());
232 : }
233 : }
234 : Err(e) => {
235 : return Err(anyhow::anyhow!("could not get database statistics: {}", e));
236 : }
237 : }
238 : }
239 :
240 : // If database statistics are the same, check all backends for state changes.
241 : // Maybe there are some with more recent activity. `get_backends_state_change()`
242 : // can return None or stale timestamp, so it's `compute.update_last_active()`
243 : // responsibility to check if the new timestamp is more recent than the current one.
244 : // This helps us to discover new sessions that have not done anything yet.
245 : match get_backends_state_change(cli) {
246 : Ok(last_active) => match (last_active, self.last_active) {
247 : (Some(last_active), Some(prev_last_active)) => {
248 : if last_active > prev_last_active {
249 : self.last_active = Some(last_active);
250 : return Ok(());
251 : }
252 : }
253 : (Some(last_active), None) => {
254 : self.last_active = Some(last_active);
255 : return Ok(());
256 : }
257 : _ => {}
258 : },
259 : Err(e) => {
260 : return Err(anyhow::anyhow!(
261 : "could not get backends state change: {}",
262 : e
263 : ));
264 : }
265 : }
266 :
267 : // If there are existing (logical) walsenders, do not suspend.
268 : //
269 : // N.B. walproposer doesn't currently show up in pg_stat_replication,
270 : // but protect if it will.
271 : const WS_COUNT_QUERY: &str =
272 : "select count(*) from pg_stat_replication where application_name != 'walproposer';";
273 : match cli.query_one(WS_COUNT_QUERY, &[]) {
274 : Ok(r) => match r.try_get::<&str, i64>("count") {
275 : Ok(num_ws) => {
276 : if num_ws > 0 {
277 : self.last_active = Some(Utc::now());
278 : return Ok(());
279 : }
280 : }
281 : Err(e) => {
282 : let err: anyhow::Error = e.into();
283 : return Err(err.context("failed to parse walsenders count"));
284 : }
285 : },
286 : Err(e) => {
287 : return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
288 : }
289 : }
290 :
291 : // Don't suspend compute if there is an active logical replication subscription
292 : //
293 : // `where pid is not null` – to filter out read only computes and subscription on branches
294 : const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
295 : "select count(*) from pg_stat_subscription where pid is not null;";
296 : match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
297 : Ok(row) => match row.try_get::<&str, i64>("count") {
298 : Ok(num_subscribers) => {
299 : if num_subscribers > 0 {
300 : self.last_active = Some(Utc::now());
301 : return Ok(());
302 : }
303 : }
304 : Err(e) => {
305 : return Err(anyhow::anyhow!(
306 : "failed to parse 'pg_stat_subscription' count: {}",
307 : e
308 : ));
309 : }
310 : },
311 : Err(e) => {
312 : return Err(anyhow::anyhow!(
313 : "failed to get list of active logical replication subscriptions: {}",
314 : e
315 : ));
316 : }
317 : }
318 :
319 : // Do not suspend compute if autovacuum is running
320 : const AUTOVACUUM_COUNT_QUERY: &str =
321 : "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
322 : match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
323 : Ok(r) => match r.try_get::<&str, i64>("count") {
324 : Ok(num_workers) => {
325 : if num_workers > 0 {
326 : self.last_active = Some(Utc::now());
327 : return Ok(());
328 : };
329 : }
330 : Err(e) => {
331 : return Err(anyhow::anyhow!(
332 : "failed to parse autovacuum workers count: {}",
333 : e
334 : ));
335 : }
336 : },
337 : Err(e) => {
338 : return Err(anyhow::anyhow!(
339 : "failed to get list of autovacuum workers: {}",
340 : e
341 : ));
342 : }
343 : }
344 :
345 : Ok(())
346 : }
347 : }
348 :
349 : // Hang on condition variable waiting until the compute status is `Running`.
350 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
351 0 : let mut state = compute.state.lock().unwrap();
352 0 : while state.status != ComputeStatus::Running {
353 0 : info!("compute is not running, waiting before monitoring activity");
354 0 : state = compute.state_changed.wait(state).unwrap();
355 0 :
356 0 : if state.status == ComputeStatus::Running {
357 0 : break;
358 0 : }
359 : }
360 0 : }
361 :
362 : // Figure out the total active time and sessions across all non-system databases.
363 : // Returned tuple is `(active_time, sessions)`.
364 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
365 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
366 : // (or open any sessions) yet.
367 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
368 0 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
369 0 : // like `postgres_exporter` use it to query Postgres statistics.
370 0 : // Use explicit 8 bytes type casts to match Rust types.
371 0 : let stats = cli.query_one(
372 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
373 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
374 0 : FROM pg_stat_database
375 0 : WHERE datname NOT IN (
376 0 : 'postgres',
377 0 : 'template0',
378 0 : 'template1'
379 0 : );",
380 0 : &[],
381 0 : );
382 0 : let stats = match stats {
383 0 : Ok(stats) => stats,
384 0 : Err(e) => {
385 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
386 : }
387 : };
388 :
389 0 : let active_time: f64 = match stats.try_get("total_active_time") {
390 0 : Ok(active_time) => active_time,
391 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
392 : };
393 :
394 0 : let sessions: i64 = match stats.try_get("total_sessions") {
395 0 : Ok(sessions) => sessions,
396 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
397 : };
398 :
399 0 : Ok((active_time, sessions))
400 0 : }
401 :
402 : // Figure out the most recent state change time across all client backends.
403 : // If there is currently active backend, timestamp will be `Utc::now()`.
404 : // It can return `None`, which means no client backends exist or we were
405 : // unable to parse the timestamp.
406 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
407 0 : let mut last_active: Option<DateTime<Utc>> = None;
408 0 : // Get all running client backends except ourself, use RFC3339 DateTime format.
409 0 : let backends = cli.query(
410 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
411 0 : FROM pg_stat_activity
412 0 : WHERE backend_type = 'client backend'
413 0 : AND pid != pg_backend_pid()
414 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
415 0 : &[],
416 0 : );
417 0 :
418 0 : match backends {
419 0 : Ok(backs) => {
420 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
421 :
422 0 : for b in backs.into_iter() {
423 0 : let state: String = match b.try_get("state") {
424 0 : Ok(state) => state,
425 0 : Err(_) => continue,
426 : };
427 :
428 0 : if state == "idle" {
429 0 : let change: String = match b.try_get("state_change") {
430 0 : Ok(state_change) => state_change,
431 0 : Err(_) => continue,
432 : };
433 0 : let change = DateTime::parse_from_rfc3339(&change);
434 0 : match change {
435 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
436 0 : Err(e) => {
437 0 : info!("cannot parse backend state_change DateTime: {}", e);
438 0 : continue;
439 : }
440 : }
441 : } else {
442 : // Found non-idle backend, so the last activity is NOW.
443 : // Return immediately, no need to check other backends.
444 0 : return Ok(Some(Utc::now()));
445 : }
446 : }
447 :
448 : // Get idle backend `state_change` with the max timestamp.
449 0 : if let Some(last) = idle_backs.iter().max() {
450 0 : last_active = Some(*last);
451 0 : }
452 : }
453 0 : Err(e) => {
454 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
455 : }
456 : }
457 :
458 0 : Ok(last_active)
459 0 : }
460 :
461 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
462 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
463 0 : let compute = Arc::clone(compute);
464 0 : let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
465 0 : let now = Utc::now();
466 0 : let mut monitor = ComputeMonitor {
467 0 : compute,
468 0 : last_active: None,
469 0 : last_checked: now,
470 0 : last_up: now,
471 0 : active_time: None,
472 0 : sessions: None,
473 0 : experimental,
474 0 : };
475 0 :
476 0 : thread::Builder::new()
477 0 : .name("compute-monitor".into())
478 0 : .spawn(move || {
479 0 : let span = span!(Level::INFO, "compute_monitor");
480 0 : let _enter = span.enter();
481 0 : match monitor.run() {
482 0 : Ok(_) => info!("compute monitor thread terminated gracefully"),
483 0 : Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
484 : }
485 0 : })
486 0 : .expect("cannot launch compute monitor thread")
487 0 : }
|