LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: 17080b14f46954d6812ea0a7dad4b2247e0840a8.info Lines: 0.0 % 119 0
Test Date: 2025-07-08 18:30:10 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::thread;
       3              : use std::time::Duration;
       4              : 
       5              : use chrono::{DateTime, Utc};
       6              : use compute_api::responses::ComputeStatus;
       7              : use compute_api::spec::ComputeFeature;
       8              : use postgres::{Client, NoTls};
       9              : use tracing::{Level, error, info, instrument, span};
      10              : 
      11              : use crate::compute::ComputeNode;
      12              : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
      13              : 
      14              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      15              : 
      16              : /// Struct to store runtime state of the compute monitor thread.
      17              : /// In theory, this could be a part of `Compute`, but i)
      18              : /// this state is expected to be accessed only by single thread,
      19              : /// so we don't need to care about locking; ii) `Compute` is
      20              : /// already quite big. Thus, it seems to be a good idea to keep
      21              : /// all the activity/health monitoring parts here.
      22              : struct ComputeMonitor {
      23              :     compute: Arc<ComputeNode>,
      24              : 
      25              :     /// The moment when Postgres had some activity,
      26              :     /// that should prevent compute from being suspended.
      27              :     last_active: Option<DateTime<Utc>>,
      28              : 
      29              :     /// The moment when we last tried to check Postgres.
      30              :     last_checked: DateTime<Utc>,
      31              :     /// The last moment we did a successful Postgres check.
      32              :     last_up: DateTime<Utc>,
      33              : 
      34              :     /// Only used for internal statistics change tracking
      35              :     /// between monitor runs and can be outdated.
      36              :     active_time: Option<f64>,
      37              :     /// Only used for internal statistics change tracking
      38              :     /// between monitor runs and can be outdated.
      39              :     sessions: Option<i64>,
      40              : 
      41              :     /// Use experimental statistics-based activity monitor. It's no longer
      42              :     /// 'experimental' per se, as it's enabled for everyone, but we still
      43              :     /// keep the flag as an option to turn it off in some cases if it will
      44              :     /// misbehave.
      45              :     experimental: bool,
      46              : }
      47              : 
      48              : impl ComputeMonitor {
      49            0 :     fn report_down(&self) {
      50            0 :         let now = Utc::now();
      51              : 
      52              :         // Calculate and report current downtime
      53              :         // (since the last time Postgres was up)
      54            0 :         let downtime = now.signed_duration_since(self.last_up);
      55            0 :         PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
      56              : 
      57              :         // Calculate and update total downtime
      58              :         // (cumulative duration of Postgres downtime in ms)
      59            0 :         let inc = now
      60            0 :             .signed_duration_since(self.last_checked)
      61            0 :             .num_milliseconds();
      62            0 :         PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
      63            0 :     }
      64              : 
      65            0 :     fn report_up(&mut self) {
      66            0 :         self.last_up = Utc::now();
      67            0 :         PG_CURR_DOWNTIME_MS.set(0.0);
      68            0 :     }
      69              : 
      70            0 :     fn downtime_info(&self) -> String {
      71            0 :         format!(
      72            0 :             "total_ms: {}, current_ms: {}, last_up: {}",
      73            0 :             PG_TOTAL_DOWNTIME_MS.get(),
      74            0 :             PG_CURR_DOWNTIME_MS.get(),
      75              :             self.last_up
      76              :         )
      77            0 :     }
      78              : 
      79              :     /// Check if compute is in some terminal or soon-to-be-terminal
      80              :     /// state, then return `true`, signalling the caller that it
      81              :     /// should exit gracefully. Otherwise, return `false`.
      82            0 :     fn check_interrupts(&mut self) -> bool {
      83            0 :         let compute_status = self.compute.get_status();
      84            0 :         if matches!(
      85            0 :             compute_status,
      86              :             ComputeStatus::Terminated
      87              :                 | ComputeStatus::TerminationPending { .. }
      88              :                 | ComputeStatus::Failed
      89              :         ) {
      90            0 :             info!(
      91            0 :                 "compute is in {} status, stopping compute monitor",
      92              :                 compute_status
      93              :             );
      94            0 :             return true;
      95            0 :         }
      96              : 
      97            0 :         false
      98            0 :     }
      99              : 
     100              :     /// Spin in a loop and figure out the last activity time in the Postgres.
     101              :     /// Then update it in the shared state. This function currently never
     102              :     /// errors out explicitly, but there is a graceful termination path.
     103              :     /// Every time we receive an error trying to check Postgres, we use
     104              :     /// [`ComputeMonitor::check_interrupts()`] because it could be that
     105              :     /// compute is being terminated already, then we can exit gracefully
     106              :     /// to not produce errors' noise in the log.
     107              :     /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
     108              :     /// should be handled gracefully.
     109              :     #[instrument(skip_all)]
     110              :     pub fn run(&mut self) -> anyhow::Result<()> {
     111              :         // Suppose that `connstr` doesn't change
     112              :         let connstr = self.compute.params.connstr.clone();
     113              :         let conf = self
     114              :             .compute
     115              :             .get_conn_conf(Some("compute_ctl:compute_monitor"));
     116              : 
     117              :         // During startup and configuration we connect to every Postgres database,
     118              :         // but we don't want to count this as some user activity. So wait until
     119              :         // the compute fully started before monitoring activity.
     120              :         wait_for_postgres_start(&self.compute);
     121              : 
     122              :         // Define `client` outside of the loop to reuse existing connection if it's active.
     123              :         let mut client = conf.connect(NoTls);
     124              : 
     125              :         info!("starting compute monitor for {}", connstr);
     126              : 
     127              :         loop {
     128              :             if self.check_interrupts() {
     129              :                 break;
     130              :             }
     131              : 
     132              :             match &mut client {
     133              :                 Ok(cli) => {
     134              :                     if cli.is_closed() {
     135              :                         info!(
     136              :                             downtime_info = self.downtime_info(),
     137              :                             "connection to Postgres is closed, trying to reconnect"
     138              :                         );
     139              :                         if self.check_interrupts() {
     140              :                             break;
     141              :                         }
     142              : 
     143              :                         self.report_down();
     144              : 
     145              :                         // Connection is closed, reconnect and try again.
     146              :                         client = conf.connect(NoTls);
     147              :                     } else {
     148              :                         match self.check(cli) {
     149              :                             Ok(_) => {
     150              :                                 self.report_up();
     151              :                                 self.compute.update_last_active(self.last_active);
     152              :                             }
     153              :                             Err(e) => {
     154              :                                 error!(
     155              :                                     downtime_info = self.downtime_info(),
     156              :                                     "could not check Postgres: {}", e
     157              :                                 );
     158              :                                 if self.check_interrupts() {
     159              :                                     break;
     160              :                                 }
     161              : 
     162              :                                 // Although we have many places where we can return errors in `check()`,
     163              :                                 // normally it shouldn't happen. I.e., we will likely return error if
     164              :                                 // connection got broken, query timed out, Postgres returned invalid data, etc.
     165              :                                 // In all such cases it's suspicious, so let's report this as downtime.
     166              :                                 self.report_down();
     167              : 
     168              :                                 // Reconnect to Postgres just in case. During tests, I noticed
     169              :                                 // that queries in `check()` can fail with `connection closed`,
     170              :                                 // but `cli.is_closed()` above doesn't detect it. Even if old
     171              :                                 // connection is still alive, it will be dropped when we reassign
     172              :                                 // `client` to a new connection.
     173              :                                 client = conf.connect(NoTls);
     174              :                             }
     175              :                         }
     176              :                     }
     177              :                 }
     178              :                 Err(e) => {
     179              :                     info!(
     180              :                         downtime_info = self.downtime_info(),
     181              :                         "could not connect to Postgres: {}, retrying", e
     182              :                     );
     183              :                     if self.check_interrupts() {
     184              :                         break;
     185              :                     }
     186              : 
     187              :                     self.report_down();
     188              : 
     189              :                     // Establish a new connection and try again.
     190              :                     client = conf.connect(NoTls);
     191              :                 }
     192              :             }
     193              : 
     194              :             // Reset the `last_checked` timestamp and sleep before the next iteration.
     195              :             self.last_checked = Utc::now();
     196              :             thread::sleep(MONITOR_CHECK_INTERVAL);
     197              :         }
     198              : 
     199              :         // Graceful termination path
     200              :         Ok(())
     201              :     }
     202              : 
     203              :     #[instrument(skip_all)]
     204              :     fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
     205              :         // This is new logic, only enable if the feature flag is set.
     206              :         // TODO: remove this once we are sure that it works OR drop it altogether.
     207              :         if self.experimental {
     208              :             // Check if the total active time or sessions across all databases has changed.
     209              :             // If it did, it means that user executed some queries. In theory, it can even go down if
     210              :             // some databases were dropped, but it's still user activity.
     211              :             match get_database_stats(cli) {
     212              :                 Ok((active_time, sessions)) => {
     213              :                     let mut detected_activity = false;
     214              : 
     215              :                     if let Some(prev_active_time) = self.active_time {
     216              :                         if active_time != prev_active_time {
     217              :                             detected_activity = true;
     218              :                         }
     219              :                     }
     220              :                     self.active_time = Some(active_time);
     221              : 
     222              :                     if let Some(prev_sessions) = self.sessions {
     223              :                         if sessions != prev_sessions {
     224              :                             detected_activity = true;
     225              :                         }
     226              :                     }
     227              :                     self.sessions = Some(sessions);
     228              : 
     229              :                     if detected_activity {
     230              :                         // Update the last active time and continue, we don't need to
     231              :                         // check backends state change.
     232              :                         self.last_active = Some(Utc::now());
     233              :                         return Ok(());
     234              :                     }
     235              :                 }
     236              :                 Err(e) => {
     237              :                     return Err(anyhow::anyhow!("could not get database statistics: {}", e));
     238              :                 }
     239              :             }
     240              :         }
     241              : 
     242              :         // If database statistics are the same, check all backends for state changes.
     243              :         // Maybe there are some with more recent activity. `get_backends_state_change()`
     244              :         // can return None or stale timestamp, so it's `compute.update_last_active()`
     245              :         // responsibility to check if the new timestamp is more recent than the current one.
     246              :         // This helps us to discover new sessions that have not done anything yet.
     247              :         match get_backends_state_change(cli) {
     248              :             Ok(last_active) => match (last_active, self.last_active) {
     249              :                 (Some(last_active), Some(prev_last_active)) => {
     250              :                     if last_active > prev_last_active {
     251              :                         self.last_active = Some(last_active);
     252              :                         return Ok(());
     253              :                     }
     254              :                 }
     255              :                 (Some(last_active), None) => {
     256              :                     self.last_active = Some(last_active);
     257              :                     return Ok(());
     258              :                 }
     259              :                 _ => {}
     260              :             },
     261              :             Err(e) => {
     262              :                 return Err(anyhow::anyhow!(
     263              :                     "could not get backends state change: {}",
     264              :                     e
     265              :                 ));
     266              :             }
     267              :         }
     268              : 
     269              :         // If there are existing (logical) walsenders, do not suspend.
     270              :         //
     271              :         // N.B. walproposer doesn't currently show up in pg_stat_replication,
     272              :         // but protect if it will.
     273              :         const WS_COUNT_QUERY: &str =
     274              :             "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     275              :         match cli.query_one(WS_COUNT_QUERY, &[]) {
     276              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     277              :                 Ok(num_ws) => {
     278              :                     if num_ws > 0 {
     279              :                         self.last_active = Some(Utc::now());
     280              :                         return Ok(());
     281              :                     }
     282              :                 }
     283              :                 Err(e) => {
     284              :                     let err: anyhow::Error = e.into();
     285              :                     return Err(err.context("failed to parse walsenders count"));
     286              :                 }
     287              :             },
     288              :             Err(e) => {
     289              :                 return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
     290              :             }
     291              :         }
     292              : 
     293              :         // Don't suspend compute if there is an active logical replication subscription
     294              :         //
     295              :         // `where pid is not null` – to filter out read only computes and subscription on branches
     296              :         const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
     297              :             "select count(*) from pg_stat_subscription where pid is not null;";
     298              :         match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
     299              :             Ok(row) => match row.try_get::<&str, i64>("count") {
     300              :                 Ok(num_subscribers) => {
     301              :                     if num_subscribers > 0 {
     302              :                         self.last_active = Some(Utc::now());
     303              :                         return Ok(());
     304              :                     }
     305              :                 }
     306              :                 Err(e) => {
     307              :                     return Err(anyhow::anyhow!(
     308              :                         "failed to parse 'pg_stat_subscription' count: {}",
     309              :                         e
     310              :                     ));
     311              :                 }
     312              :             },
     313              :             Err(e) => {
     314              :                 return Err(anyhow::anyhow!(
     315              :                     "failed to get list of active logical replication subscriptions: {}",
     316              :                     e
     317              :                 ));
     318              :             }
     319              :         }
     320              : 
     321              :         // Do not suspend compute if autovacuum is running
     322              :         const AUTOVACUUM_COUNT_QUERY: &str =
     323              :             "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     324              :         match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
     325              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     326              :                 Ok(num_workers) => {
     327              :                     if num_workers > 0 {
     328              :                         self.last_active = Some(Utc::now());
     329              :                         return Ok(());
     330              :                     };
     331              :                 }
     332              :                 Err(e) => {
     333              :                     return Err(anyhow::anyhow!(
     334              :                         "failed to parse autovacuum workers count: {}",
     335              :                         e
     336              :                     ));
     337              :                 }
     338              :             },
     339              :             Err(e) => {
     340              :                 return Err(anyhow::anyhow!(
     341              :                     "failed to get list of autovacuum workers: {}",
     342              :                     e
     343              :                 ));
     344              :             }
     345              :         }
     346              : 
     347              :         Ok(())
     348              :     }
     349              : }
     350              : 
     351              : // Hang on condition variable waiting until the compute status is `Running`.
     352            0 : fn wait_for_postgres_start(compute: &ComputeNode) {
     353            0 :     let mut state = compute.state.lock().unwrap();
     354            0 :     while state.status != ComputeStatus::Running {
     355            0 :         info!("compute is not running, waiting before monitoring activity");
     356            0 :         state = compute.state_changed.wait(state).unwrap();
     357              : 
     358            0 :         if state.status == ComputeStatus::Running {
     359            0 :             break;
     360            0 :         }
     361              :     }
     362            0 : }
     363              : 
     364              : // Figure out the total active time and sessions across all non-system databases.
     365              : // Returned tuple is `(active_time, sessions)`.
     366              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     367              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     368              : // (or open any sessions) yet.
     369            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     370              :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     371              :     // like `postgres_exporter` use it to query Postgres statistics.
     372              :     // Use explicit 8 bytes type casts to match Rust types.
     373            0 :     let stats = cli.query_one(
     374            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     375            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     376            0 :         FROM pg_stat_database
     377            0 :         WHERE datname NOT IN (
     378            0 :                 'postgres',
     379            0 :                 'template0',
     380            0 :                 'template1'
     381            0 :             );",
     382            0 :         &[],
     383              :     );
     384            0 :     let stats = match stats {
     385            0 :         Ok(stats) => stats,
     386            0 :         Err(e) => {
     387            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     388              :         }
     389              :     };
     390              : 
     391            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     392            0 :         Ok(active_time) => active_time,
     393            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     394              :     };
     395              : 
     396            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     397            0 :         Ok(sessions) => sessions,
     398            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     399              :     };
     400              : 
     401            0 :     Ok((active_time, sessions))
     402            0 : }
     403              : 
     404              : // Figure out the most recent state change time across all client backends.
     405              : // If there is currently active backend, timestamp will be `Utc::now()`.
     406              : // It can return `None`, which means no client backends exist or we were
     407              : // unable to parse the timestamp.
     408            0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     409            0 :     let mut last_active: Option<DateTime<Utc>> = None;
     410              :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     411            0 :     let backends = cli.query(
     412            0 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     413            0 :                 FROM pg_stat_activity
     414            0 :                     WHERE backend_type = 'client backend'
     415            0 :                     AND pid != pg_backend_pid()
     416            0 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     417            0 :         &[],
     418              :     );
     419              : 
     420            0 :     match backends {
     421            0 :         Ok(backs) => {
     422            0 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     423              : 
     424            0 :             for b in backs.into_iter() {
     425            0 :                 let state: String = match b.try_get("state") {
     426            0 :                     Ok(state) => state,
     427            0 :                     Err(_) => continue,
     428              :                 };
     429              : 
     430            0 :                 if state == "idle" {
     431            0 :                     let change: String = match b.try_get("state_change") {
     432            0 :                         Ok(state_change) => state_change,
     433            0 :                         Err(_) => continue,
     434              :                     };
     435            0 :                     let change = DateTime::parse_from_rfc3339(&change);
     436            0 :                     match change {
     437            0 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     438            0 :                         Err(e) => {
     439            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     440            0 :                             continue;
     441              :                         }
     442              :                     }
     443              :                 } else {
     444              :                     // Found non-idle backend, so the last activity is NOW.
     445              :                     // Return immediately, no need to check other backends.
     446            0 :                     return Ok(Some(Utc::now()));
     447              :                 }
     448              :             }
     449              : 
     450              :             // Get idle backend `state_change` with the max timestamp.
     451            0 :             if let Some(last) = idle_backs.iter().max() {
     452            0 :                 last_active = Some(*last);
     453            0 :             }
     454              :         }
     455            0 :         Err(e) => {
     456            0 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     457              :         }
     458              :     }
     459              : 
     460            0 :     Ok(last_active)
     461            0 : }
     462              : 
     463              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     464            0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     465            0 :     let compute = Arc::clone(compute);
     466            0 :     let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
     467            0 :     let now = Utc::now();
     468            0 :     let mut monitor = ComputeMonitor {
     469            0 :         compute,
     470            0 :         last_active: None,
     471            0 :         last_checked: now,
     472            0 :         last_up: now,
     473            0 :         active_time: None,
     474            0 :         sessions: None,
     475            0 :         experimental,
     476            0 :     };
     477              : 
     478            0 :     thread::Builder::new()
     479            0 :         .name("compute-monitor".into())
     480            0 :         .spawn(move || {
     481            0 :             let span = span!(Level::INFO, "compute_monitor");
     482            0 :             let _enter = span.enter();
     483            0 :             match monitor.run() {
     484            0 :                 Ok(_) => info!("compute monitor thread terminated gracefully"),
     485            0 :                 Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
     486              :             }
     487            0 :         })
     488            0 :         .expect("cannot launch compute monitor thread")
     489            0 : }
        

Generated by: LCOV version 2.1-beta