Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{Level, error, info, instrument, span};
10 :
11 : use crate::compute::ComputeNode;
12 : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
13 :
14 : const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
15 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
16 :
17 : /// Struct to store runtime state of the compute monitor thread.
18 : /// In theory, this could be a part of `Compute`, but i)
19 : /// this state is expected to be accessed only by single thread,
20 : /// so we don't need to care about locking; ii) `Compute` is
21 : /// already quite big. Thus, it seems to be a good idea to keep
22 : /// all the activity/health monitoring parts here.
23 : struct ComputeMonitor {
24 : compute: Arc<ComputeNode>,
25 :
26 : /// The moment when Postgres had some activity,
27 : /// that should prevent compute from being suspended.
28 : last_active: Option<DateTime<Utc>>,
29 :
30 : /// The moment when we last tried to check Postgres.
31 : last_checked: DateTime<Utc>,
32 : /// The last moment we did a successful Postgres check.
33 : last_up: DateTime<Utc>,
34 :
35 : /// Only used for internal statistics change tracking
36 : /// between monitor runs and can be outdated.
37 : active_time: Option<f64>,
38 : /// Only used for internal statistics change tracking
39 : /// between monitor runs and can be outdated.
40 : sessions: Option<i64>,
41 :
42 : /// Use experimental statistics-based activity monitor. It's no longer
43 : /// 'experimental' per se, as it's enabled for everyone, but we still
44 : /// keep the flag as an option to turn it off in some cases if it will
45 : /// misbehave.
46 : experimental: bool,
47 : }
48 :
49 : impl ComputeMonitor {
50 0 : fn report_down(&self) {
51 0 : let now = Utc::now();
52 :
53 : // Calculate and report current downtime
54 : // (since the last time Postgres was up)
55 0 : let downtime = now.signed_duration_since(self.last_up);
56 0 : PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
57 :
58 : // Calculate and update total downtime
59 : // (cumulative duration of Postgres downtime in ms)
60 0 : let inc = now
61 0 : .signed_duration_since(self.last_checked)
62 0 : .num_milliseconds();
63 0 : PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
64 0 : }
65 :
66 0 : fn report_up(&mut self) {
67 0 : self.last_up = Utc::now();
68 0 : PG_CURR_DOWNTIME_MS.set(0.0);
69 0 : }
70 :
71 0 : fn downtime_info(&self) -> String {
72 0 : format!(
73 0 : "total_ms: {}, current_ms: {}, last_up: {}",
74 0 : PG_TOTAL_DOWNTIME_MS.get(),
75 0 : PG_CURR_DOWNTIME_MS.get(),
76 : self.last_up
77 : )
78 0 : }
79 :
80 : /// Check if compute is in some terminal or soon-to-be-terminal
81 : /// state, then return `true`, signalling the caller that it
82 : /// should exit gracefully. Otherwise, return `false`.
83 0 : fn check_interrupts(&mut self) -> bool {
84 0 : let compute_status = self.compute.get_status();
85 0 : if matches!(
86 0 : compute_status,
87 : ComputeStatus::Terminated
88 : | ComputeStatus::TerminationPendingFast
89 : | ComputeStatus::TerminationPendingImmediate
90 : | ComputeStatus::Failed
91 : ) {
92 0 : info!(
93 0 : "compute is in {} status, stopping compute monitor",
94 : compute_status
95 : );
96 0 : return true;
97 0 : }
98 :
99 0 : false
100 0 : }
101 :
102 : /// Spin in a loop and figure out the last activity time in the Postgres.
103 : /// Then update it in the shared state. This function currently never
104 : /// errors out explicitly, but there is a graceful termination path.
105 : /// Every time we receive an error trying to check Postgres, we use
106 : /// [`ComputeMonitor::check_interrupts()`] because it could be that
107 : /// compute is being terminated already, then we can exit gracefully
108 : /// to not produce errors' noise in the log.
109 : /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
110 : /// should be handled gracefully.
111 : #[instrument(skip_all)]
112 : pub fn run(&mut self) -> anyhow::Result<()> {
113 : // Suppose that `connstr` doesn't change
114 : let connstr = self.compute.params.connstr.clone();
115 : let conf = self
116 : .compute
117 : .get_conn_conf(Some("compute_ctl:compute_monitor"));
118 :
119 : // During startup and configuration we connect to every Postgres database,
120 : // but we don't want to count this as some user activity. So wait until
121 : // the compute fully started before monitoring activity.
122 : wait_for_postgres_start(&self.compute);
123 :
124 : // Define `client` outside of the loop to reuse existing connection if it's active.
125 : let mut client = conf.connect(NoTls);
126 :
127 : info!("starting compute monitor for {}", connstr);
128 :
129 : loop {
130 : if self.check_interrupts() {
131 : break;
132 : }
133 :
134 : match &mut client {
135 : Ok(cli) => {
136 : if cli.is_closed() {
137 : info!(
138 : downtime_info = self.downtime_info(),
139 : "connection to Postgres is closed, trying to reconnect"
140 : );
141 : if self.check_interrupts() {
142 : break;
143 : }
144 :
145 : self.report_down();
146 :
147 : // Connection is closed, reconnect and try again.
148 : client = conf.connect(NoTls);
149 : } else {
150 : match self.check(cli) {
151 : Ok(_) => {
152 : self.report_up();
153 : self.compute.update_last_active(self.last_active);
154 : }
155 : Err(e) => {
156 : error!(
157 : downtime_info = self.downtime_info(),
158 : "could not check Postgres: {}", e
159 : );
160 : if self.check_interrupts() {
161 : break;
162 : }
163 :
164 : // Although we have many places where we can return errors in `check()`,
165 : // normally it shouldn't happen. I.e., we will likely return error if
166 : // connection got broken, query timed out, Postgres returned invalid data, etc.
167 : // In all such cases it's suspicious, so let's report this as downtime.
168 : self.report_down();
169 :
170 : // Reconnect to Postgres just in case. During tests, I noticed
171 : // that queries in `check()` can fail with `connection closed`,
172 : // but `cli.is_closed()` above doesn't detect it. Even if old
173 : // connection is still alive, it will be dropped when we reassign
174 : // `client` to a new connection.
175 : client = conf.connect(NoTls);
176 : }
177 : }
178 : }
179 : }
180 : Err(e) => {
181 : info!(
182 : downtime_info = self.downtime_info(),
183 : "could not connect to Postgres: {}, retrying", e
184 : );
185 : if self.check_interrupts() {
186 : break;
187 : }
188 :
189 : self.report_down();
190 :
191 : // Establish a new connection and try again.
192 : client = conf.connect(NoTls);
193 : }
194 : }
195 :
196 : // Reset the `last_checked` timestamp and sleep before the next iteration.
197 : self.last_checked = Utc::now();
198 : thread::sleep(MONITOR_CHECK_INTERVAL);
199 : }
200 :
201 : // Graceful termination path
202 : Ok(())
203 : }
204 :
205 : #[instrument(skip_all)]
206 : fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
207 : // This is new logic, only enable if the feature flag is set.
208 : // TODO: remove this once we are sure that it works OR drop it altogether.
209 : if self.experimental {
210 : // Check if the total active time or sessions across all databases has changed.
211 : // If it did, it means that user executed some queries. In theory, it can even go down if
212 : // some databases were dropped, but it's still user activity.
213 : match get_database_stats(cli) {
214 : Ok((active_time, sessions)) => {
215 : let mut detected_activity = false;
216 :
217 : if let Some(prev_active_time) = self.active_time {
218 : if active_time != prev_active_time {
219 : detected_activity = true;
220 : }
221 : }
222 : self.active_time = Some(active_time);
223 :
224 : if let Some(prev_sessions) = self.sessions {
225 : if sessions != prev_sessions {
226 : detected_activity = true;
227 : }
228 : }
229 : self.sessions = Some(sessions);
230 :
231 : if detected_activity {
232 : // Update the last active time and continue, we don't need to
233 : // check backends state change.
234 : self.last_active = Some(Utc::now());
235 : return Ok(());
236 : }
237 : }
238 : Err(e) => {
239 : return Err(anyhow::anyhow!("could not get database statistics: {}", e));
240 : }
241 : }
242 : }
243 :
244 : // If database statistics are the same, check all backends for state changes.
245 : // Maybe there are some with more recent activity. `get_backends_state_change()`
246 : // can return None or stale timestamp, so it's `compute.update_last_active()`
247 : // responsibility to check if the new timestamp is more recent than the current one.
248 : // This helps us to discover new sessions that have not done anything yet.
249 : match get_backends_state_change(cli) {
250 : Ok(last_active) => match (last_active, self.last_active) {
251 : (Some(last_active), Some(prev_last_active)) => {
252 : if last_active > prev_last_active {
253 : self.last_active = Some(last_active);
254 : return Ok(());
255 : }
256 : }
257 : (Some(last_active), None) => {
258 : self.last_active = Some(last_active);
259 : return Ok(());
260 : }
261 : _ => {}
262 : },
263 : Err(e) => {
264 : return Err(anyhow::anyhow!(
265 : "could not get backends state change: {}",
266 : e
267 : ));
268 : }
269 : }
270 :
271 : // If there are existing (logical) walsenders, do not suspend.
272 : //
273 : // N.B. walproposer doesn't currently show up in pg_stat_replication,
274 : // but protect if it will.
275 : const WS_COUNT_QUERY: &str =
276 : "select count(*) from pg_stat_replication where application_name != 'walproposer';";
277 : match cli.query_one(WS_COUNT_QUERY, &[]) {
278 : Ok(r) => match r.try_get::<&str, i64>("count") {
279 : Ok(num_ws) => {
280 : if num_ws > 0 {
281 : self.last_active = Some(Utc::now());
282 : return Ok(());
283 : }
284 : }
285 : Err(e) => {
286 : let err: anyhow::Error = e.into();
287 : return Err(err.context("failed to parse walsenders count"));
288 : }
289 : },
290 : Err(e) => {
291 : return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
292 : }
293 : }
294 :
295 : // Don't suspend compute if there is an active logical replication subscription
296 : //
297 : // `where pid is not null` – to filter out read only computes and subscription on branches
298 : const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
299 : "select count(*) from pg_stat_subscription where pid is not null;";
300 : match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
301 : Ok(row) => match row.try_get::<&str, i64>("count") {
302 : Ok(num_subscribers) => {
303 : if num_subscribers > 0 {
304 : self.last_active = Some(Utc::now());
305 : return Ok(());
306 : }
307 : }
308 : Err(e) => {
309 : return Err(anyhow::anyhow!(
310 : "failed to parse 'pg_stat_subscription' count: {}",
311 : e
312 : ));
313 : }
314 : },
315 : Err(e) => {
316 : return Err(anyhow::anyhow!(
317 : "failed to get list of active logical replication subscriptions: {}",
318 : e
319 : ));
320 : }
321 : }
322 :
323 : // Do not suspend compute if autovacuum is running
324 : const AUTOVACUUM_COUNT_QUERY: &str =
325 : "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
326 : match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
327 : Ok(r) => match r.try_get::<&str, i64>("count") {
328 : Ok(num_workers) => {
329 : if num_workers > 0 {
330 : self.last_active = Some(Utc::now());
331 : return Ok(());
332 : };
333 : }
334 : Err(e) => {
335 : return Err(anyhow::anyhow!(
336 : "failed to parse autovacuum workers count: {}",
337 : e
338 : ));
339 : }
340 : },
341 : Err(e) => {
342 : return Err(anyhow::anyhow!(
343 : "failed to get list of autovacuum workers: {}",
344 : e
345 : ));
346 : }
347 : }
348 :
349 : Ok(())
350 : }
351 : }
352 :
353 : // Hang on condition variable waiting until the compute status is `Running`.
354 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
355 0 : let mut state = compute.state.lock().unwrap();
356 0 : let pg_init_timeout = compute
357 0 : .params
358 0 : .pg_init_timeout
359 0 : .unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
360 :
361 0 : while state.status != ComputeStatus::Running {
362 0 : info!("compute is not running, waiting before monitoring activity");
363 0 : if !compute.params.lakebase_mode {
364 0 : state = compute.state_changed.wait(state).unwrap();
365 :
366 0 : if state.status == ComputeStatus::Running {
367 0 : break;
368 0 : }
369 0 : continue;
370 0 : }
371 :
372 0 : if state.pg_start_time.is_some()
373 0 : && Utc::now()
374 0 : .signed_duration_since(state.pg_start_time.unwrap())
375 0 : .to_std()
376 0 : .unwrap_or_default()
377 0 : > pg_init_timeout
378 : {
379 : // If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
380 : // possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
381 : // deadends we simply exit (panic) the compute node so it can restart with the latest spec.
382 : //
383 : // NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
384 : // This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
385 : // to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
386 0 : error!(
387 0 : "compute did not enter Running state in {} seconds, exiting",
388 0 : pg_init_timeout.as_secs()
389 : );
390 0 : std::process::exit(1);
391 0 : }
392 0 : state = compute
393 0 : .state_changed
394 0 : .wait_timeout(state, Duration::from_secs(5))
395 0 : .unwrap()
396 0 : .0;
397 : }
398 0 : }
399 :
400 : // Figure out the total active time and sessions across all non-system databases.
401 : // Returned tuple is `(active_time, sessions)`.
402 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
403 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
404 : // (or open any sessions) yet.
405 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
406 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
407 : // like `postgres_exporter` use it to query Postgres statistics.
408 : // Use explicit 8 bytes type casts to match Rust types.
409 0 : let stats = cli.query_one(
410 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
411 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
412 0 : FROM pg_stat_database
413 0 : WHERE datname NOT IN (
414 0 : 'postgres',
415 0 : 'template0',
416 0 : 'template1'
417 0 : );",
418 0 : &[],
419 : );
420 0 : let stats = match stats {
421 0 : Ok(stats) => stats,
422 0 : Err(e) => {
423 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
424 : }
425 : };
426 :
427 0 : let active_time: f64 = match stats.try_get("total_active_time") {
428 0 : Ok(active_time) => active_time,
429 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
430 : };
431 :
432 0 : let sessions: i64 = match stats.try_get("total_sessions") {
433 0 : Ok(sessions) => sessions,
434 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
435 : };
436 :
437 0 : Ok((active_time, sessions))
438 0 : }
439 :
440 : // Figure out the most recent state change time across all client backends.
441 : // If there is currently active backend, timestamp will be `Utc::now()`.
442 : // It can return `None`, which means no client backends exist or we were
443 : // unable to parse the timestamp.
444 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
445 0 : let mut last_active: Option<DateTime<Utc>> = None;
446 : // Get all running client backends except ourself, use RFC3339 DateTime format.
447 0 : let backends = cli.query(
448 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
449 0 : FROM pg_stat_activity
450 0 : WHERE backend_type = 'client backend'
451 0 : AND pid != pg_backend_pid()
452 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
453 0 : &[],
454 : );
455 :
456 0 : match backends {
457 0 : Ok(backs) => {
458 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
459 :
460 0 : for b in backs.into_iter() {
461 0 : let state: String = match b.try_get("state") {
462 0 : Ok(state) => state,
463 0 : Err(_) => continue,
464 : };
465 :
466 0 : if state == "idle" {
467 0 : let change: String = match b.try_get("state_change") {
468 0 : Ok(state_change) => state_change,
469 0 : Err(_) => continue,
470 : };
471 0 : let change = DateTime::parse_from_rfc3339(&change);
472 0 : match change {
473 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
474 0 : Err(e) => {
475 0 : info!("cannot parse backend state_change DateTime: {}", e);
476 0 : continue;
477 : }
478 : }
479 : } else {
480 : // Found non-idle backend, so the last activity is NOW.
481 : // Return immediately, no need to check other backends.
482 0 : return Ok(Some(Utc::now()));
483 : }
484 : }
485 :
486 : // Get idle backend `state_change` with the max timestamp.
487 0 : if let Some(last) = idle_backs.iter().max() {
488 0 : last_active = Some(*last);
489 0 : }
490 : }
491 0 : Err(e) => {
492 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
493 : }
494 : }
495 :
496 0 : Ok(last_active)
497 0 : }
498 :
499 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
500 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
501 0 : let compute = Arc::clone(compute);
502 0 : let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
503 0 : let now = Utc::now();
504 0 : let mut monitor = ComputeMonitor {
505 0 : compute,
506 0 : last_active: None,
507 0 : last_checked: now,
508 0 : last_up: now,
509 0 : active_time: None,
510 0 : sessions: None,
511 0 : experimental,
512 0 : };
513 :
514 0 : thread::Builder::new()
515 0 : .name("compute-monitor".into())
516 0 : .spawn(move || {
517 0 : let span = span!(Level::INFO, "compute_monitor");
518 0 : let _enter = span.enter();
519 0 : match monitor.run() {
520 0 : Ok(_) => info!("compute monitor thread terminated gracefully"),
521 0 : Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
522 : }
523 0 : })
524 0 : .expect("cannot launch compute monitor thread")
525 0 : }
|