LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 119 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::thread;
       3              : use std::time::Duration;
       4              : 
       5              : use chrono::{DateTime, Utc};
       6              : use compute_api::responses::ComputeStatus;
       7              : use compute_api::spec::ComputeFeature;
       8              : use postgres::{Client, NoTls};
       9              : use tracing::{Level, error, info, instrument, span};
      10              : 
      11              : use crate::compute::ComputeNode;
      12              : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
      13              : 
      14              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      15              : 
      16              : /// Struct to store runtime state of the compute monitor thread.
      17              : /// In theory, this could be a part of `Compute`, but i)
      18              : /// this state is expected to be accessed only by single thread,
      19              : /// so we don't need to care about locking; ii) `Compute` is
      20              : /// already quite big. Thus, it seems to be a good idea to keep
      21              : /// all the activity/health monitoring parts here.
      22              : struct ComputeMonitor {
      23              :     compute: Arc<ComputeNode>,
      24              : 
      25              :     /// The moment when Postgres had some activity,
      26              :     /// that should prevent compute from being suspended.
      27              :     last_active: Option<DateTime<Utc>>,
      28              : 
      29              :     /// The moment when we last tried to check Postgres.
      30              :     last_checked: DateTime<Utc>,
      31              :     /// The last moment we did a successful Postgres check.
      32              :     last_up: DateTime<Utc>,
      33              : 
      34              :     /// Only used for internal statistics change tracking
      35              :     /// between monitor runs and can be outdated.
      36              :     active_time: Option<f64>,
      37              :     /// Only used for internal statistics change tracking
      38              :     /// between monitor runs and can be outdated.
      39              :     sessions: Option<i64>,
      40              : 
      41              :     /// Use experimental statistics-based activity monitor. It's no longer
      42              :     /// 'experimental' per se, as it's enabled for everyone, but we still
      43              :     /// keep the flag as an option to turn it off in some cases if it will
      44              :     /// misbehave.
      45              :     experimental: bool,
      46              : }
      47              : 
      48              : impl ComputeMonitor {
      49            0 :     fn report_down(&self) {
      50            0 :         let now = Utc::now();
      51              : 
      52              :         // Calculate and report current downtime
      53              :         // (since the last time Postgres was up)
      54            0 :         let downtime = now.signed_duration_since(self.last_up);
      55            0 :         PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
      56              : 
      57              :         // Calculate and update total downtime
      58              :         // (cumulative duration of Postgres downtime in ms)
      59            0 :         let inc = now
      60            0 :             .signed_duration_since(self.last_checked)
      61            0 :             .num_milliseconds();
      62            0 :         PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
      63            0 :     }
      64              : 
      65            0 :     fn report_up(&mut self) {
      66            0 :         self.last_up = Utc::now();
      67            0 :         PG_CURR_DOWNTIME_MS.set(0.0);
      68            0 :     }
      69              : 
      70            0 :     fn downtime_info(&self) -> String {
      71            0 :         format!(
      72            0 :             "total_ms: {}, current_ms: {}, last_up: {}",
      73            0 :             PG_TOTAL_DOWNTIME_MS.get(),
      74            0 :             PG_CURR_DOWNTIME_MS.get(),
      75              :             self.last_up
      76              :         )
      77            0 :     }
      78              : 
      79              :     /// Check if compute is in some terminal or soon-to-be-terminal
      80              :     /// state, then return `true`, signalling the caller that it
      81              :     /// should exit gracefully. Otherwise, return `false`.
      82            0 :     fn check_interrupts(&mut self) -> bool {
      83            0 :         let compute_status = self.compute.get_status();
      84            0 :         if matches!(
      85            0 :             compute_status,
      86              :             ComputeStatus::Terminated
      87              :                 | ComputeStatus::TerminationPendingFast
      88              :                 | ComputeStatus::TerminationPendingImmediate
      89              :                 | ComputeStatus::Failed
      90              :         ) {
      91            0 :             info!(
      92            0 :                 "compute is in {} status, stopping compute monitor",
      93              :                 compute_status
      94              :             );
      95            0 :             return true;
      96            0 :         }
      97              : 
      98            0 :         false
      99            0 :     }
     100              : 
     101              :     /// Spin in a loop and figure out the last activity time in the Postgres.
     102              :     /// Then update it in the shared state. This function currently never
     103              :     /// errors out explicitly, but there is a graceful termination path.
     104              :     /// Every time we receive an error trying to check Postgres, we use
     105              :     /// [`ComputeMonitor::check_interrupts()`] because it could be that
     106              :     /// compute is being terminated already, then we can exit gracefully
     107              :     /// to not produce errors' noise in the log.
     108              :     /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
     109              :     /// should be handled gracefully.
     110              :     #[instrument(skip_all)]
     111              :     pub fn run(&mut self) -> anyhow::Result<()> {
     112              :         // Suppose that `connstr` doesn't change
     113              :         let connstr = self.compute.params.connstr.clone();
     114              :         let conf = self
     115              :             .compute
     116              :             .get_conn_conf(Some("compute_ctl:compute_monitor"));
     117              : 
     118              :         // During startup and configuration we connect to every Postgres database,
     119              :         // but we don't want to count this as some user activity. So wait until
     120              :         // the compute fully started before monitoring activity.
     121              :         wait_for_postgres_start(&self.compute);
     122              : 
     123              :         // Define `client` outside of the loop to reuse existing connection if it's active.
     124              :         let mut client = conf.connect(NoTls);
     125              : 
     126              :         info!("starting compute monitor for {}", connstr);
     127              : 
     128              :         loop {
     129              :             if self.check_interrupts() {
     130              :                 break;
     131              :             }
     132              : 
     133              :             match &mut client {
     134              :                 Ok(cli) => {
     135              :                     if cli.is_closed() {
     136              :                         info!(
     137              :                             downtime_info = self.downtime_info(),
     138              :                             "connection to Postgres is closed, trying to reconnect"
     139              :                         );
     140              :                         if self.check_interrupts() {
     141              :                             break;
     142              :                         }
     143              : 
     144              :                         self.report_down();
     145              : 
     146              :                         // Connection is closed, reconnect and try again.
     147              :                         client = conf.connect(NoTls);
     148              :                     } else {
     149              :                         match self.check(cli) {
     150              :                             Ok(_) => {
     151              :                                 self.report_up();
     152              :                                 self.compute.update_last_active(self.last_active);
     153              :                             }
     154              :                             Err(e) => {
     155              :                                 error!(
     156              :                                     downtime_info = self.downtime_info(),
     157              :                                     "could not check Postgres: {}", e
     158              :                                 );
     159              :                                 if self.check_interrupts() {
     160              :                                     break;
     161              :                                 }
     162              : 
     163              :                                 // Although we have many places where we can return errors in `check()`,
     164              :                                 // normally it shouldn't happen. I.e., we will likely return error if
     165              :                                 // connection got broken, query timed out, Postgres returned invalid data, etc.
     166              :                                 // In all such cases it's suspicious, so let's report this as downtime.
     167              :                                 self.report_down();
     168              : 
     169              :                                 // Reconnect to Postgres just in case. During tests, I noticed
     170              :                                 // that queries in `check()` can fail with `connection closed`,
     171              :                                 // but `cli.is_closed()` above doesn't detect it. Even if old
     172              :                                 // connection is still alive, it will be dropped when we reassign
     173              :                                 // `client` to a new connection.
     174              :                                 client = conf.connect(NoTls);
     175              :                             }
     176              :                         }
     177              :                     }
     178              :                 }
     179              :                 Err(e) => {
     180              :                     info!(
     181              :                         downtime_info = self.downtime_info(),
     182              :                         "could not connect to Postgres: {}, retrying", e
     183              :                     );
     184              :                     if self.check_interrupts() {
     185              :                         break;
     186              :                     }
     187              : 
     188              :                     self.report_down();
     189              : 
     190              :                     // Establish a new connection and try again.
     191              :                     client = conf.connect(NoTls);
     192              :                 }
     193              :             }
     194              : 
     195              :             // Reset the `last_checked` timestamp and sleep before the next iteration.
     196              :             self.last_checked = Utc::now();
     197              :             thread::sleep(MONITOR_CHECK_INTERVAL);
     198              :         }
     199              : 
     200              :         // Graceful termination path
     201              :         Ok(())
     202              :     }
     203              : 
     204              :     #[instrument(skip_all)]
     205              :     fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
     206              :         // This is new logic, only enable if the feature flag is set.
     207              :         // TODO: remove this once we are sure that it works OR drop it altogether.
     208              :         if self.experimental {
     209              :             // Check if the total active time or sessions across all databases has changed.
     210              :             // If it did, it means that user executed some queries. In theory, it can even go down if
     211              :             // some databases were dropped, but it's still user activity.
     212              :             match get_database_stats(cli) {
     213              :                 Ok((active_time, sessions)) => {
     214              :                     let mut detected_activity = false;
     215              : 
     216              :                     if let Some(prev_active_time) = self.active_time {
     217              :                         if active_time != prev_active_time {
     218              :                             detected_activity = true;
     219              :                         }
     220              :                     }
     221              :                     self.active_time = Some(active_time);
     222              : 
     223              :                     if let Some(prev_sessions) = self.sessions {
     224              :                         if sessions != prev_sessions {
     225              :                             detected_activity = true;
     226              :                         }
     227              :                     }
     228              :                     self.sessions = Some(sessions);
     229              : 
     230              :                     if detected_activity {
     231              :                         // Update the last active time and continue, we don't need to
     232              :                         // check backends state change.
     233              :                         self.last_active = Some(Utc::now());
     234              :                         return Ok(());
     235              :                     }
     236              :                 }
     237              :                 Err(e) => {
     238              :                     return Err(anyhow::anyhow!("could not get database statistics: {}", e));
     239              :                 }
     240              :             }
     241              :         }
     242              : 
     243              :         // If database statistics are the same, check all backends for state changes.
     244              :         // Maybe there are some with more recent activity. `get_backends_state_change()`
     245              :         // can return None or stale timestamp, so it's `compute.update_last_active()`
     246              :         // responsibility to check if the new timestamp is more recent than the current one.
     247              :         // This helps us to discover new sessions that have not done anything yet.
     248              :         match get_backends_state_change(cli) {
     249              :             Ok(last_active) => match (last_active, self.last_active) {
     250              :                 (Some(last_active), Some(prev_last_active)) => {
     251              :                     if last_active > prev_last_active {
     252              :                         self.last_active = Some(last_active);
     253              :                         return Ok(());
     254              :                     }
     255              :                 }
     256              :                 (Some(last_active), None) => {
     257              :                     self.last_active = Some(last_active);
     258              :                     return Ok(());
     259              :                 }
     260              :                 _ => {}
     261              :             },
     262              :             Err(e) => {
     263              :                 return Err(anyhow::anyhow!(
     264              :                     "could not get backends state change: {}",
     265              :                     e
     266              :                 ));
     267              :             }
     268              :         }
     269              : 
     270              :         // If there are existing (logical) walsenders, do not suspend.
     271              :         //
     272              :         // N.B. walproposer doesn't currently show up in pg_stat_replication,
     273              :         // but protect if it will.
     274              :         const WS_COUNT_QUERY: &str =
     275              :             "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     276              :         match cli.query_one(WS_COUNT_QUERY, &[]) {
     277              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     278              :                 Ok(num_ws) => {
     279              :                     if num_ws > 0 {
     280              :                         self.last_active = Some(Utc::now());
     281              :                         return Ok(());
     282              :                     }
     283              :                 }
     284              :                 Err(e) => {
     285              :                     let err: anyhow::Error = e.into();
     286              :                     return Err(err.context("failed to parse walsenders count"));
     287              :                 }
     288              :             },
     289              :             Err(e) => {
     290              :                 return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
     291              :             }
     292              :         }
     293              : 
     294              :         // Don't suspend compute if there is an active logical replication subscription
     295              :         //
     296              :         // `where pid is not null` – to filter out read only computes and subscription on branches
     297              :         const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
     298              :             "select count(*) from pg_stat_subscription where pid is not null;";
     299              :         match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
     300              :             Ok(row) => match row.try_get::<&str, i64>("count") {
     301              :                 Ok(num_subscribers) => {
     302              :                     if num_subscribers > 0 {
     303              :                         self.last_active = Some(Utc::now());
     304              :                         return Ok(());
     305              :                     }
     306              :                 }
     307              :                 Err(e) => {
     308              :                     return Err(anyhow::anyhow!(
     309              :                         "failed to parse 'pg_stat_subscription' count: {}",
     310              :                         e
     311              :                     ));
     312              :                 }
     313              :             },
     314              :             Err(e) => {
     315              :                 return Err(anyhow::anyhow!(
     316              :                     "failed to get list of active logical replication subscriptions: {}",
     317              :                     e
     318              :                 ));
     319              :             }
     320              :         }
     321              : 
     322              :         // Do not suspend compute if autovacuum is running
     323              :         const AUTOVACUUM_COUNT_QUERY: &str =
     324              :             "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     325              :         match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
     326              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     327              :                 Ok(num_workers) => {
     328              :                     if num_workers > 0 {
     329              :                         self.last_active = Some(Utc::now());
     330              :                         return Ok(());
     331              :                     };
     332              :                 }
     333              :                 Err(e) => {
     334              :                     return Err(anyhow::anyhow!(
     335              :                         "failed to parse autovacuum workers count: {}",
     336              :                         e
     337              :                     ));
     338              :                 }
     339              :             },
     340              :             Err(e) => {
     341              :                 return Err(anyhow::anyhow!(
     342              :                     "failed to get list of autovacuum workers: {}",
     343              :                     e
     344              :                 ));
     345              :             }
     346              :         }
     347              : 
     348              :         Ok(())
     349              :     }
     350              : }
     351              : 
     352              : // Hang on condition variable waiting until the compute status is `Running`.
     353            0 : fn wait_for_postgres_start(compute: &ComputeNode) {
     354            0 :     let mut state = compute.state.lock().unwrap();
     355            0 :     while state.status != ComputeStatus::Running {
     356            0 :         info!("compute is not running, waiting before monitoring activity");
     357            0 :         state = compute.state_changed.wait(state).unwrap();
     358              : 
     359            0 :         if state.status == ComputeStatus::Running {
     360            0 :             break;
     361            0 :         }
     362              :     }
     363            0 : }
     364              : 
     365              : // Figure out the total active time and sessions across all non-system databases.
     366              : // Returned tuple is `(active_time, sessions)`.
     367              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     368              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     369              : // (or open any sessions) yet.
     370            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     371              :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     372              :     // like `postgres_exporter` use it to query Postgres statistics.
     373              :     // Use explicit 8 bytes type casts to match Rust types.
     374            0 :     let stats = cli.query_one(
     375            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     376            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     377            0 :         FROM pg_stat_database
     378            0 :         WHERE datname NOT IN (
     379            0 :                 'postgres',
     380            0 :                 'template0',
     381            0 :                 'template1'
     382            0 :             );",
     383            0 :         &[],
     384              :     );
     385            0 :     let stats = match stats {
     386            0 :         Ok(stats) => stats,
     387            0 :         Err(e) => {
     388            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     389              :         }
     390              :     };
     391              : 
     392            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     393            0 :         Ok(active_time) => active_time,
     394            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     395              :     };
     396              : 
     397            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     398            0 :         Ok(sessions) => sessions,
     399            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     400              :     };
     401              : 
     402            0 :     Ok((active_time, sessions))
     403            0 : }
     404              : 
     405              : // Figure out the most recent state change time across all client backends.
     406              : // If there is currently active backend, timestamp will be `Utc::now()`.
     407              : // It can return `None`, which means no client backends exist or we were
     408              : // unable to parse the timestamp.
     409            0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     410            0 :     let mut last_active: Option<DateTime<Utc>> = None;
     411              :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     412            0 :     let backends = cli.query(
     413            0 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     414            0 :                 FROM pg_stat_activity
     415            0 :                     WHERE backend_type = 'client backend'
     416            0 :                     AND pid != pg_backend_pid()
     417            0 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     418            0 :         &[],
     419              :     );
     420              : 
     421            0 :     match backends {
     422            0 :         Ok(backs) => {
     423            0 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     424              : 
     425            0 :             for b in backs.into_iter() {
     426            0 :                 let state: String = match b.try_get("state") {
     427            0 :                     Ok(state) => state,
     428            0 :                     Err(_) => continue,
     429              :                 };
     430              : 
     431            0 :                 if state == "idle" {
     432            0 :                     let change: String = match b.try_get("state_change") {
     433            0 :                         Ok(state_change) => state_change,
     434            0 :                         Err(_) => continue,
     435              :                     };
     436            0 :                     let change = DateTime::parse_from_rfc3339(&change);
     437            0 :                     match change {
     438            0 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     439            0 :                         Err(e) => {
     440            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     441            0 :                             continue;
     442              :                         }
     443              :                     }
     444              :                 } else {
     445              :                     // Found non-idle backend, so the last activity is NOW.
     446              :                     // Return immediately, no need to check other backends.
     447            0 :                     return Ok(Some(Utc::now()));
     448              :                 }
     449              :             }
     450              : 
     451              :             // Get idle backend `state_change` with the max timestamp.
     452            0 :             if let Some(last) = idle_backs.iter().max() {
     453            0 :                 last_active = Some(*last);
     454            0 :             }
     455              :         }
     456            0 :         Err(e) => {
     457            0 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     458              :         }
     459              :     }
     460              : 
     461            0 :     Ok(last_active)
     462            0 : }
     463              : 
     464              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     465            0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     466            0 :     let compute = Arc::clone(compute);
     467            0 :     let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
     468            0 :     let now = Utc::now();
     469            0 :     let mut monitor = ComputeMonitor {
     470            0 :         compute,
     471            0 :         last_active: None,
     472            0 :         last_checked: now,
     473            0 :         last_up: now,
     474            0 :         active_time: None,
     475            0 :         sessions: None,
     476            0 :         experimental,
     477            0 :     };
     478              : 
     479            0 :     thread::Builder::new()
     480            0 :         .name("compute-monitor".into())
     481            0 :         .spawn(move || {
     482            0 :             let span = span!(Level::INFO, "compute_monitor");
     483            0 :             let _enter = span.enter();
     484            0 :             match monitor.run() {
     485            0 :                 Ok(_) => info!("compute monitor thread terminated gracefully"),
     486            0 :                 Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
     487              :             }
     488            0 :         })
     489            0 :         .expect("cannot launch compute monitor thread")
     490            0 : }
        

Generated by: LCOV version 2.1-beta