LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 0.0 % 124 0
Test Date: 2025-05-01 22:50:11 Functions: 0.0 % 8 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::thread;
       3              : use std::time::Duration;
       4              : 
       5              : use chrono::{DateTime, Utc};
       6              : use compute_api::responses::ComputeStatus;
       7              : use compute_api::spec::ComputeFeature;
       8              : use postgres::{Client, NoTls};
       9              : use tracing::{Level, error, info, instrument, span};
      10              : 
      11              : use crate::compute::ComputeNode;
      12              : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
      13              : 
      14              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      15              : 
      16              : struct ComputeMonitor {
      17              :     compute: Arc<ComputeNode>,
      18              : 
      19              :     /// The moment when Postgres had some activity,
      20              :     /// that should prevent compute from being suspended.
      21              :     last_active: Option<DateTime<Utc>>,
      22              : 
      23              :     /// The moment when we last tried to check Postgres.
      24              :     last_checked: DateTime<Utc>,
      25              :     /// The last moment we did a successful Postgres check.
      26              :     last_up: DateTime<Utc>,
      27              : 
      28              :     /// Only used for internal statistics change tracking
      29              :     /// between monitor runs and can be outdated.
      30              :     active_time: Option<f64>,
      31              :     /// Only used for internal statistics change tracking
      32              :     /// between monitor runs and can be outdated.
      33              :     sessions: Option<i64>,
      34              : 
      35              :     /// Use experimental statistics-based activity monitor. It's no longer
      36              :     /// 'experimental' per se, as it's enabled for everyone, but we still
      37              :     /// keep the flag as an option to turn it off in some cases if it will
      38              :     /// misbehave.
      39              :     experimental: bool,
      40              : }
      41              : 
      42              : impl ComputeMonitor {
      43            0 :     fn report_down(&self) {
      44            0 :         let now = Utc::now();
      45            0 : 
      46            0 :         // Calculate and report current downtime
      47            0 :         // (since the last time Postgres was up)
      48            0 :         let downtime = now.signed_duration_since(self.last_up);
      49            0 :         PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
      50            0 : 
      51            0 :         // Calculate and update total downtime
      52            0 :         // (cumulative duration of Postgres downtime in ms)
      53            0 :         let inc = now
      54            0 :             .signed_duration_since(self.last_checked)
      55            0 :             .num_milliseconds();
      56            0 :         PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
      57            0 :     }
      58              : 
      59            0 :     fn report_up(&mut self) {
      60            0 :         self.last_up = Utc::now();
      61            0 :         PG_CURR_DOWNTIME_MS.set(0.0);
      62            0 :     }
      63              : 
      64            0 :     fn downtime_info(&self) -> String {
      65            0 :         format!(
      66            0 :             "total_ms: {}, current_ms: {}, last_up: {}",
      67            0 :             PG_TOTAL_DOWNTIME_MS.get(),
      68            0 :             PG_CURR_DOWNTIME_MS.get(),
      69            0 :             self.last_up
      70            0 :         )
      71            0 :     }
      72              : 
      73              :     /// Spin in a loop and figure out the last activity time in the Postgres.
      74              :     /// Then update it in the shared state. This function never errors out.
      75              :     /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
      76              :     /// should be handled gracefully.
      77              :     #[instrument(skip_all)]
      78              :     pub fn run(&mut self) {
      79              :         // Suppose that `connstr` doesn't change
      80              :         let connstr = self.compute.params.connstr.clone();
      81              :         let conf = self
      82              :             .compute
      83              :             .get_conn_conf(Some("compute_ctl:compute_monitor"));
      84              : 
      85              :         // During startup and configuration we connect to every Postgres database,
      86              :         // but we don't want to count this as some user activity. So wait until
      87              :         // the compute fully started before monitoring activity.
      88              :         wait_for_postgres_start(&self.compute);
      89              : 
      90              :         // Define `client` outside of the loop to reuse existing connection if it's active.
      91              :         let mut client = conf.connect(NoTls);
      92              : 
      93              :         info!("starting compute monitor for {}", connstr);
      94              : 
      95              :         loop {
      96              :             match &mut client {
      97              :                 Ok(cli) => {
      98              :                     if cli.is_closed() {
      99              :                         info!(
     100              :                             downtime_info = self.downtime_info(),
     101              :                             "connection to Postgres is closed, trying to reconnect"
     102              :                         );
     103              :                         self.report_down();
     104              : 
     105              :                         // Connection is closed, reconnect and try again.
     106              :                         client = conf.connect(NoTls);
     107              :                     } else {
     108              :                         match self.check(cli) {
     109              :                             Ok(_) => {
     110              :                                 self.report_up();
     111              :                                 self.compute.update_last_active(self.last_active);
     112              :                             }
     113              :                             Err(e) => {
     114              :                                 // Although we have many places where we can return errors in `check()`,
     115              :                                 // normally it shouldn't happen. I.e., we will likely return error if
     116              :                                 // connection got broken, query timed out, Postgres returned invalid data, etc.
     117              :                                 // In all such cases it's suspicious, so let's report this as downtime.
     118              :                                 self.report_down();
     119              :                                 error!(
     120              :                                     downtime_info = self.downtime_info(),
     121              :                                     "could not check Postgres: {}", e
     122              :                                 );
     123              : 
     124              :                                 // Reconnect to Postgres just in case. During tests, I noticed
     125              :                                 // that queries in `check()` can fail with `connection closed`,
     126              :                                 // but `cli.is_closed()` above doesn't detect it. Even if old
     127              :                                 // connection is still alive, it will be dropped when we reassign
     128              :                                 // `client` to a new connection.
     129              :                                 client = conf.connect(NoTls);
     130              :                             }
     131              :                         }
     132              :                     }
     133              :                 }
     134              :                 Err(e) => {
     135              :                     info!(
     136              :                         downtime_info = self.downtime_info(),
     137              :                         "could not connect to Postgres: {}, retrying", e
     138              :                     );
     139              :                     self.report_down();
     140              : 
     141              :                     // Establish a new connection and try again.
     142              :                     client = conf.connect(NoTls);
     143              :                 }
     144              :             }
     145              : 
     146              :             // Reset the `last_checked` timestamp and sleep before the next iteration.
     147              :             self.last_checked = Utc::now();
     148              :             thread::sleep(MONITOR_CHECK_INTERVAL);
     149              :         }
     150              :     }
     151              : 
     152              :     #[instrument(skip_all)]
     153              :     fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
     154              :         // This is new logic, only enable if the feature flag is set.
     155              :         // TODO: remove this once we are sure that it works OR drop it altogether.
     156              :         if self.experimental {
     157              :             // Check if the total active time or sessions across all databases has changed.
     158              :             // If it did, it means that user executed some queries. In theory, it can even go down if
     159              :             // some databases were dropped, but it's still user activity.
     160              :             match get_database_stats(cli) {
     161              :                 Ok((active_time, sessions)) => {
     162              :                     let mut detected_activity = false;
     163              : 
     164              :                     if let Some(prev_active_time) = self.active_time {
     165              :                         if active_time != prev_active_time {
     166              :                             detected_activity = true;
     167              :                         }
     168              :                     }
     169              :                     self.active_time = Some(active_time);
     170              : 
     171              :                     if let Some(prev_sessions) = self.sessions {
     172              :                         if sessions != prev_sessions {
     173              :                             detected_activity = true;
     174              :                         }
     175              :                     }
     176              :                     self.sessions = Some(sessions);
     177              : 
     178              :                     if detected_activity {
     179              :                         // Update the last active time and continue, we don't need to
     180              :                         // check backends state change.
     181              :                         self.last_active = Some(Utc::now());
     182              :                         return Ok(());
     183              :                     }
     184              :                 }
     185              :                 Err(e) => {
     186              :                     return Err(anyhow::anyhow!("could not get database statistics: {}", e));
     187              :                 }
     188              :             }
     189              :         }
     190              : 
     191              :         // If database statistics are the same, check all backends for state changes.
     192              :         // Maybe there are some with more recent activity. `get_backends_state_change()`
     193              :         // can return None or stale timestamp, so it's `compute.update_last_active()`
     194              :         // responsibility to check if the new timestamp is more recent than the current one.
     195              :         // This helps us to discover new sessions that have not done anything yet.
     196              :         match get_backends_state_change(cli) {
     197              :             Ok(last_active) => match (last_active, self.last_active) {
     198              :                 (Some(last_active), Some(prev_last_active)) => {
     199              :                     if last_active > prev_last_active {
     200              :                         self.last_active = Some(last_active);
     201              :                         return Ok(());
     202              :                     }
     203              :                 }
     204              :                 (Some(last_active), None) => {
     205              :                     self.last_active = Some(last_active);
     206              :                     return Ok(());
     207              :                 }
     208              :                 _ => {}
     209              :             },
     210              :             Err(e) => {
     211              :                 return Err(anyhow::anyhow!(
     212              :                     "could not get backends state change: {}",
     213              :                     e
     214              :                 ));
     215              :             }
     216              :         }
     217              : 
     218              :         // If there are existing (logical) walsenders, do not suspend.
     219              :         //
     220              :         // N.B. walproposer doesn't currently show up in pg_stat_replication,
     221              :         // but protect if it will.
     222              :         const WS_COUNT_QUERY: &str =
     223              :             "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     224              :         match cli.query_one(WS_COUNT_QUERY, &[]) {
     225              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     226              :                 Ok(num_ws) => {
     227              :                     if num_ws > 0 {
     228              :                         self.last_active = Some(Utc::now());
     229              :                         return Ok(());
     230              :                     }
     231              :                 }
     232              :                 Err(e) => {
     233              :                     let err: anyhow::Error = e.into();
     234              :                     return Err(err.context("failed to parse walsenders count"));
     235              :                 }
     236              :             },
     237              :             Err(e) => {
     238              :                 return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
     239              :             }
     240              :         }
     241              : 
     242              :         // Don't suspend compute if there is an active logical replication subscription
     243              :         //
     244              :         // `where pid is not null` – to filter out read only computes and subscription on branches
     245              :         const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
     246              :             "select count(*) from pg_stat_subscription where pid is not null;";
     247              :         match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
     248              :             Ok(row) => match row.try_get::<&str, i64>("count") {
     249              :                 Ok(num_subscribers) => {
     250              :                     if num_subscribers > 0 {
     251              :                         self.last_active = Some(Utc::now());
     252              :                         return Ok(());
     253              :                     }
     254              :                 }
     255              :                 Err(e) => {
     256              :                     return Err(anyhow::anyhow!(
     257              :                         "failed to parse 'pg_stat_subscription' count: {}",
     258              :                         e
     259              :                     ));
     260              :                 }
     261              :             },
     262              :             Err(e) => {
     263              :                 return Err(anyhow::anyhow!(
     264              :                     "failed to get list of active logical replication subscriptions: {}",
     265              :                     e
     266              :                 ));
     267              :             }
     268              :         }
     269              : 
     270              :         // Do not suspend compute if autovacuum is running
     271              :         const AUTOVACUUM_COUNT_QUERY: &str =
     272              :             "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     273              :         match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
     274              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     275              :                 Ok(num_workers) => {
     276              :                     if num_workers > 0 {
     277              :                         self.last_active = Some(Utc::now());
     278              :                         return Ok(());
     279              :                     };
     280              :                 }
     281              :                 Err(e) => {
     282              :                     return Err(anyhow::anyhow!(
     283              :                         "failed to parse autovacuum workers count: {}",
     284              :                         e
     285              :                     ));
     286              :                 }
     287              :             },
     288              :             Err(e) => {
     289              :                 return Err(anyhow::anyhow!(
     290              :                     "failed to get list of autovacuum workers: {}",
     291              :                     e
     292              :                 ));
     293              :             }
     294              :         }
     295              : 
     296              :         Ok(())
     297              :     }
     298              : }
     299              : 
     300              : // Hang on condition variable waiting until the compute status is `Running`.
     301            0 : fn wait_for_postgres_start(compute: &ComputeNode) {
     302            0 :     let mut state = compute.state.lock().unwrap();
     303            0 :     while state.status != ComputeStatus::Running {
     304            0 :         info!("compute is not running, waiting before monitoring activity");
     305            0 :         state = compute.state_changed.wait(state).unwrap();
     306            0 : 
     307            0 :         if state.status == ComputeStatus::Running {
     308            0 :             break;
     309            0 :         }
     310              :     }
     311            0 : }
     312              : 
     313              : // Figure out the total active time and sessions across all non-system databases.
     314              : // Returned tuple is `(active_time, sessions)`.
     315              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     316              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     317              : // (or open any sessions) yet.
     318            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     319            0 :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     320            0 :     // like `postgres_exporter` use it to query Postgres statistics.
     321            0 :     // Use explicit 8 bytes type casts to match Rust types.
     322            0 :     let stats = cli.query_one(
     323            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     324            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     325            0 :         FROM pg_stat_database
     326            0 :         WHERE datname NOT IN (
     327            0 :                 'postgres',
     328            0 :                 'template0',
     329            0 :                 'template1'
     330            0 :             );",
     331            0 :         &[],
     332            0 :     );
     333            0 :     let stats = match stats {
     334            0 :         Ok(stats) => stats,
     335            0 :         Err(e) => {
     336            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     337              :         }
     338              :     };
     339              : 
     340            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     341            0 :         Ok(active_time) => active_time,
     342            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     343              :     };
     344              : 
     345            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     346            0 :         Ok(sessions) => sessions,
     347            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     348              :     };
     349              : 
     350            0 :     Ok((active_time, sessions))
     351            0 : }
     352              : 
     353              : // Figure out the most recent state change time across all client backends.
     354              : // If there is currently active backend, timestamp will be `Utc::now()`.
     355              : // It can return `None`, which means no client backends exist or we were
     356              : // unable to parse the timestamp.
     357            0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     358            0 :     let mut last_active: Option<DateTime<Utc>> = None;
     359            0 :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     360            0 :     let backends = cli.query(
     361            0 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     362            0 :                 FROM pg_stat_activity
     363            0 :                     WHERE backend_type = 'client backend'
     364            0 :                     AND pid != pg_backend_pid()
     365            0 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     366            0 :         &[],
     367            0 :     );
     368            0 : 
     369            0 :     match backends {
     370            0 :         Ok(backs) => {
     371            0 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     372              : 
     373            0 :             for b in backs.into_iter() {
     374            0 :                 let state: String = match b.try_get("state") {
     375            0 :                     Ok(state) => state,
     376            0 :                     Err(_) => continue,
     377              :                 };
     378              : 
     379            0 :                 if state == "idle" {
     380            0 :                     let change: String = match b.try_get("state_change") {
     381            0 :                         Ok(state_change) => state_change,
     382            0 :                         Err(_) => continue,
     383              :                     };
     384            0 :                     let change = DateTime::parse_from_rfc3339(&change);
     385            0 :                     match change {
     386            0 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     387            0 :                         Err(e) => {
     388            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     389            0 :                             continue;
     390              :                         }
     391              :                     }
     392              :                 } else {
     393              :                     // Found non-idle backend, so the last activity is NOW.
     394              :                     // Return immediately, no need to check other backends.
     395            0 :                     return Ok(Some(Utc::now()));
     396              :                 }
     397              :             }
     398              : 
     399              :             // Get idle backend `state_change` with the max timestamp.
     400            0 :             if let Some(last) = idle_backs.iter().max() {
     401            0 :                 last_active = Some(*last);
     402            0 :             }
     403              :         }
     404            0 :         Err(e) => {
     405            0 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     406              :         }
     407              :     }
     408              : 
     409            0 :     Ok(last_active)
     410            0 : }
     411              : 
     412              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     413            0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     414            0 :     let compute = Arc::clone(compute);
     415            0 :     let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
     416            0 :     let now = Utc::now();
     417            0 :     let mut monitor = ComputeMonitor {
     418            0 :         compute,
     419            0 :         last_active: None,
     420            0 :         last_checked: now,
     421            0 :         last_up: now,
     422            0 :         active_time: None,
     423            0 :         sessions: None,
     424            0 :         experimental,
     425            0 :     };
     426            0 : 
     427            0 :     thread::Builder::new()
     428            0 :         .name("compute-monitor".into())
     429            0 :         .spawn(move || {
     430            0 :             let span = span!(Level::INFO, "compute_monitor");
     431            0 :             let _enter = span.enter();
     432            0 :             monitor.run();
     433            0 :         })
     434            0 :         .expect("cannot launch compute monitor thread")
     435            0 : }
        

Generated by: LCOV version 2.1-beta