LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 0.0 % 137 0
Test Date: 2025-06-13 20:01:21 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::thread;
       3              : use std::time::Duration;
       4              : 
       5              : use chrono::{DateTime, Utc};
       6              : use compute_api::responses::ComputeStatus;
       7              : use compute_api::spec::ComputeFeature;
       8              : use postgres::{Client, NoTls};
       9              : use tracing::{Level, error, info, instrument, span};
      10              : 
      11              : use crate::compute::ComputeNode;
      12              : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
      13              : 
      14              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      15              : 
      16              : /// Struct to store runtime state of the compute monitor thread.
      17              : /// In theory, this could be a part of `Compute`, but i)
      18              : /// this state is expected to be accessed only by single thread,
      19              : /// so we don't need to care about locking; ii) `Compute` is
      20              : /// already quite big. Thus, it seems to be a good idea to keep
      21              : /// all the activity/health monitoring parts here.
      22              : struct ComputeMonitor {
      23              :     compute: Arc<ComputeNode>,
      24              : 
      25              :     /// The moment when Postgres had some activity,
      26              :     /// that should prevent compute from being suspended.
      27              :     last_active: Option<DateTime<Utc>>,
      28              : 
      29              :     /// The moment when we last tried to check Postgres.
      30              :     last_checked: DateTime<Utc>,
      31              :     /// The last moment we did a successful Postgres check.
      32              :     last_up: DateTime<Utc>,
      33              : 
      34              :     /// Only used for internal statistics change tracking
      35              :     /// between monitor runs and can be outdated.
      36              :     active_time: Option<f64>,
      37              :     /// Only used for internal statistics change tracking
      38              :     /// between monitor runs and can be outdated.
      39              :     sessions: Option<i64>,
      40              : 
      41              :     /// Use experimental statistics-based activity monitor. It's no longer
      42              :     /// 'experimental' per se, as it's enabled for everyone, but we still
      43              :     /// keep the flag as an option to turn it off in some cases if it will
      44              :     /// misbehave.
      45              :     experimental: bool,
      46              : }
      47              : 
      48              : impl ComputeMonitor {
      49            0 :     fn report_down(&self) {
      50            0 :         let now = Utc::now();
      51            0 : 
      52            0 :         // Calculate and report current downtime
      53            0 :         // (since the last time Postgres was up)
      54            0 :         let downtime = now.signed_duration_since(self.last_up);
      55            0 :         PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
      56            0 : 
      57            0 :         // Calculate and update total downtime
      58            0 :         // (cumulative duration of Postgres downtime in ms)
      59            0 :         let inc = now
      60            0 :             .signed_duration_since(self.last_checked)
      61            0 :             .num_milliseconds();
      62            0 :         PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
      63            0 :     }
      64              : 
      65            0 :     fn report_up(&mut self) {
      66            0 :         self.last_up = Utc::now();
      67            0 :         PG_CURR_DOWNTIME_MS.set(0.0);
      68            0 :     }
      69              : 
      70            0 :     fn downtime_info(&self) -> String {
      71            0 :         format!(
      72            0 :             "total_ms: {}, current_ms: {}, last_up: {}",
      73            0 :             PG_TOTAL_DOWNTIME_MS.get(),
      74            0 :             PG_CURR_DOWNTIME_MS.get(),
      75            0 :             self.last_up
      76            0 :         )
      77            0 :     }
      78              : 
      79              :     /// Check if compute is in some terminal or soon-to-be-terminal
      80              :     /// state, then return `true`, signalling the caller that it
      81              :     /// should exit gracefully. Otherwise, return `false`.
      82            0 :     fn check_interrupts(&mut self) -> bool {
      83            0 :         let compute_status = self.compute.get_status();
      84            0 :         if matches!(
      85            0 :             compute_status,
      86              :             ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
      87              :         ) {
      88            0 :             info!(
      89            0 :                 "compute is in {} status, stopping compute monitor",
      90              :                 compute_status
      91              :             );
      92            0 :             return true;
      93            0 :         }
      94            0 : 
      95            0 :         false
      96            0 :     }
      97              : 
      98              :     /// Spin in a loop and figure out the last activity time in the Postgres.
      99              :     /// Then update it in the shared state. This function currently never
     100              :     /// errors out explicitly, but there is a graceful termination path.
     101              :     /// Every time we receive an error trying to check Postgres, we use
     102              :     /// [`ComputeMonitor::check_interrupts()`] because it could be that
     103              :     /// compute is being terminated already, then we can exit gracefully
     104              :     /// to not produce errors' noise in the log.
     105              :     /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
     106              :     /// should be handled gracefully.
     107              :     #[instrument(skip_all)]
     108              :     pub fn run(&mut self) -> anyhow::Result<()> {
     109              :         // Suppose that `connstr` doesn't change
     110              :         let connstr = self.compute.params.connstr.clone();
     111              :         let conf = self
     112              :             .compute
     113              :             .get_conn_conf(Some("compute_ctl:compute_monitor"));
     114              : 
     115              :         // During startup and configuration we connect to every Postgres database,
     116              :         // but we don't want to count this as some user activity. So wait until
     117              :         // the compute fully started before monitoring activity.
     118              :         wait_for_postgres_start(&self.compute);
     119              : 
     120              :         // Define `client` outside of the loop to reuse existing connection if it's active.
     121              :         let mut client = conf.connect(NoTls);
     122              : 
     123              :         info!("starting compute monitor for {}", connstr);
     124              : 
     125              :         loop {
     126              :             if self.check_interrupts() {
     127              :                 break;
     128              :             }
     129              : 
     130              :             match &mut client {
     131              :                 Ok(cli) => {
     132              :                     if cli.is_closed() {
     133              :                         info!(
     134              :                             downtime_info = self.downtime_info(),
     135              :                             "connection to Postgres is closed, trying to reconnect"
     136              :                         );
     137              :                         if self.check_interrupts() {
     138              :                             break;
     139              :                         }
     140              : 
     141              :                         self.report_down();
     142              : 
     143              :                         // Connection is closed, reconnect and try again.
     144              :                         client = conf.connect(NoTls);
     145              :                     } else {
     146              :                         match self.check(cli) {
     147              :                             Ok(_) => {
     148              :                                 self.report_up();
     149              :                                 self.compute.update_last_active(self.last_active);
     150              :                             }
     151              :                             Err(e) => {
     152              :                                 error!(
     153              :                                     downtime_info = self.downtime_info(),
     154              :                                     "could not check Postgres: {}", e
     155              :                                 );
     156              :                                 if self.check_interrupts() {
     157              :                                     break;
     158              :                                 }
     159              : 
     160              :                                 // Although we have many places where we can return errors in `check()`,
     161              :                                 // normally it shouldn't happen. I.e., we will likely return error if
     162              :                                 // connection got broken, query timed out, Postgres returned invalid data, etc.
     163              :                                 // In all such cases it's suspicious, so let's report this as downtime.
     164              :                                 self.report_down();
     165              : 
     166              :                                 // Reconnect to Postgres just in case. During tests, I noticed
     167              :                                 // that queries in `check()` can fail with `connection closed`,
     168              :                                 // but `cli.is_closed()` above doesn't detect it. Even if old
     169              :                                 // connection is still alive, it will be dropped when we reassign
     170              :                                 // `client` to a new connection.
     171              :                                 client = conf.connect(NoTls);
     172              :                             }
     173              :                         }
     174              :                     }
     175              :                 }
     176              :                 Err(e) => {
     177              :                     info!(
     178              :                         downtime_info = self.downtime_info(),
     179              :                         "could not connect to Postgres: {}, retrying", e
     180              :                     );
     181              :                     if self.check_interrupts() {
     182              :                         break;
     183              :                     }
     184              : 
     185              :                     self.report_down();
     186              : 
     187              :                     // Establish a new connection and try again.
     188              :                     client = conf.connect(NoTls);
     189              :                 }
     190              :             }
     191              : 
     192              :             // Reset the `last_checked` timestamp and sleep before the next iteration.
     193              :             self.last_checked = Utc::now();
     194              :             thread::sleep(MONITOR_CHECK_INTERVAL);
     195              :         }
     196              : 
     197              :         // Graceful termination path
     198              :         Ok(())
     199              :     }
     200              : 
     201              :     #[instrument(skip_all)]
     202              :     fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
     203              :         // This is new logic, only enable if the feature flag is set.
     204              :         // TODO: remove this once we are sure that it works OR drop it altogether.
     205              :         if self.experimental {
     206              :             // Check if the total active time or sessions across all databases has changed.
     207              :             // If it did, it means that user executed some queries. In theory, it can even go down if
     208              :             // some databases were dropped, but it's still user activity.
     209              :             match get_database_stats(cli) {
     210              :                 Ok((active_time, sessions)) => {
     211              :                     let mut detected_activity = false;
     212              : 
     213              :                     if let Some(prev_active_time) = self.active_time {
     214              :                         if active_time != prev_active_time {
     215              :                             detected_activity = true;
     216              :                         }
     217              :                     }
     218              :                     self.active_time = Some(active_time);
     219              : 
     220              :                     if let Some(prev_sessions) = self.sessions {
     221              :                         if sessions != prev_sessions {
     222              :                             detected_activity = true;
     223              :                         }
     224              :                     }
     225              :                     self.sessions = Some(sessions);
     226              : 
     227              :                     if detected_activity {
     228              :                         // Update the last active time and continue, we don't need to
     229              :                         // check backends state change.
     230              :                         self.last_active = Some(Utc::now());
     231              :                         return Ok(());
     232              :                     }
     233              :                 }
     234              :                 Err(e) => {
     235              :                     return Err(anyhow::anyhow!("could not get database statistics: {}", e));
     236              :                 }
     237              :             }
     238              :         }
     239              : 
     240              :         // If database statistics are the same, check all backends for state changes.
     241              :         // Maybe there are some with more recent activity. `get_backends_state_change()`
     242              :         // can return None or stale timestamp, so it's `compute.update_last_active()`
     243              :         // responsibility to check if the new timestamp is more recent than the current one.
     244              :         // This helps us to discover new sessions that have not done anything yet.
     245              :         match get_backends_state_change(cli) {
     246              :             Ok(last_active) => match (last_active, self.last_active) {
     247              :                 (Some(last_active), Some(prev_last_active)) => {
     248              :                     if last_active > prev_last_active {
     249              :                         self.last_active = Some(last_active);
     250              :                         return Ok(());
     251              :                     }
     252              :                 }
     253              :                 (Some(last_active), None) => {
     254              :                     self.last_active = Some(last_active);
     255              :                     return Ok(());
     256              :                 }
     257              :                 _ => {}
     258              :             },
     259              :             Err(e) => {
     260              :                 return Err(anyhow::anyhow!(
     261              :                     "could not get backends state change: {}",
     262              :                     e
     263              :                 ));
     264              :             }
     265              :         }
     266              : 
     267              :         // If there are existing (logical) walsenders, do not suspend.
     268              :         //
     269              :         // N.B. walproposer doesn't currently show up in pg_stat_replication,
     270              :         // but protect if it will.
     271              :         const WS_COUNT_QUERY: &str =
     272              :             "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     273              :         match cli.query_one(WS_COUNT_QUERY, &[]) {
     274              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     275              :                 Ok(num_ws) => {
     276              :                     if num_ws > 0 {
     277              :                         self.last_active = Some(Utc::now());
     278              :                         return Ok(());
     279              :                     }
     280              :                 }
     281              :                 Err(e) => {
     282              :                     let err: anyhow::Error = e.into();
     283              :                     return Err(err.context("failed to parse walsenders count"));
     284              :                 }
     285              :             },
     286              :             Err(e) => {
     287              :                 return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
     288              :             }
     289              :         }
     290              : 
     291              :         // Don't suspend compute if there is an active logical replication subscription
     292              :         //
     293              :         // `where pid is not null` – to filter out read only computes and subscription on branches
     294              :         const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
     295              :             "select count(*) from pg_stat_subscription where pid is not null;";
     296              :         match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
     297              :             Ok(row) => match row.try_get::<&str, i64>("count") {
     298              :                 Ok(num_subscribers) => {
     299              :                     if num_subscribers > 0 {
     300              :                         self.last_active = Some(Utc::now());
     301              :                         return Ok(());
     302              :                     }
     303              :                 }
     304              :                 Err(e) => {
     305              :                     return Err(anyhow::anyhow!(
     306              :                         "failed to parse 'pg_stat_subscription' count: {}",
     307              :                         e
     308              :                     ));
     309              :                 }
     310              :             },
     311              :             Err(e) => {
     312              :                 return Err(anyhow::anyhow!(
     313              :                     "failed to get list of active logical replication subscriptions: {}",
     314              :                     e
     315              :                 ));
     316              :             }
     317              :         }
     318              : 
     319              :         // Do not suspend compute if autovacuum is running
     320              :         const AUTOVACUUM_COUNT_QUERY: &str =
     321              :             "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     322              :         match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
     323              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     324              :                 Ok(num_workers) => {
     325              :                     if num_workers > 0 {
     326              :                         self.last_active = Some(Utc::now());
     327              :                         return Ok(());
     328              :                     };
     329              :                 }
     330              :                 Err(e) => {
     331              :                     return Err(anyhow::anyhow!(
     332              :                         "failed to parse autovacuum workers count: {}",
     333              :                         e
     334              :                     ));
     335              :                 }
     336              :             },
     337              :             Err(e) => {
     338              :                 return Err(anyhow::anyhow!(
     339              :                     "failed to get list of autovacuum workers: {}",
     340              :                     e
     341              :                 ));
     342              :             }
     343              :         }
     344              : 
     345              :         Ok(())
     346              :     }
     347              : }
     348              : 
     349              : // Hang on condition variable waiting until the compute status is `Running`.
     350            0 : fn wait_for_postgres_start(compute: &ComputeNode) {
     351            0 :     let mut state = compute.state.lock().unwrap();
     352            0 :     while state.status != ComputeStatus::Running {
     353            0 :         info!("compute is not running, waiting before monitoring activity");
     354            0 :         state = compute.state_changed.wait(state).unwrap();
     355            0 : 
     356            0 :         if state.status == ComputeStatus::Running {
     357            0 :             break;
     358            0 :         }
     359              :     }
     360            0 : }
     361              : 
     362              : // Figure out the total active time and sessions across all non-system databases.
     363              : // Returned tuple is `(active_time, sessions)`.
     364              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     365              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     366              : // (or open any sessions) yet.
     367            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     368            0 :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     369            0 :     // like `postgres_exporter` use it to query Postgres statistics.
     370            0 :     // Use explicit 8 bytes type casts to match Rust types.
     371            0 :     let stats = cli.query_one(
     372            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     373            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     374            0 :         FROM pg_stat_database
     375            0 :         WHERE datname NOT IN (
     376            0 :                 'postgres',
     377            0 :                 'template0',
     378            0 :                 'template1'
     379            0 :             );",
     380            0 :         &[],
     381            0 :     );
     382            0 :     let stats = match stats {
     383            0 :         Ok(stats) => stats,
     384            0 :         Err(e) => {
     385            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     386              :         }
     387              :     };
     388              : 
     389            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     390            0 :         Ok(active_time) => active_time,
     391            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     392              :     };
     393              : 
     394            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     395            0 :         Ok(sessions) => sessions,
     396            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     397              :     };
     398              : 
     399            0 :     Ok((active_time, sessions))
     400            0 : }
     401              : 
     402              : // Figure out the most recent state change time across all client backends.
     403              : // If there is currently active backend, timestamp will be `Utc::now()`.
     404              : // It can return `None`, which means no client backends exist or we were
     405              : // unable to parse the timestamp.
     406            0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     407            0 :     let mut last_active: Option<DateTime<Utc>> = None;
     408            0 :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     409            0 :     let backends = cli.query(
     410            0 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     411            0 :                 FROM pg_stat_activity
     412            0 :                     WHERE backend_type = 'client backend'
     413            0 :                     AND pid != pg_backend_pid()
     414            0 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     415            0 :         &[],
     416            0 :     );
     417            0 : 
     418            0 :     match backends {
     419            0 :         Ok(backs) => {
     420            0 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     421              : 
     422            0 :             for b in backs.into_iter() {
     423            0 :                 let state: String = match b.try_get("state") {
     424            0 :                     Ok(state) => state,
     425            0 :                     Err(_) => continue,
     426              :                 };
     427              : 
     428            0 :                 if state == "idle" {
     429            0 :                     let change: String = match b.try_get("state_change") {
     430            0 :                         Ok(state_change) => state_change,
     431            0 :                         Err(_) => continue,
     432              :                     };
     433            0 :                     let change = DateTime::parse_from_rfc3339(&change);
     434            0 :                     match change {
     435            0 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     436            0 :                         Err(e) => {
     437            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     438            0 :                             continue;
     439              :                         }
     440              :                     }
     441              :                 } else {
     442              :                     // Found non-idle backend, so the last activity is NOW.
     443              :                     // Return immediately, no need to check other backends.
     444            0 :                     return Ok(Some(Utc::now()));
     445              :                 }
     446              :             }
     447              : 
     448              :             // Get idle backend `state_change` with the max timestamp.
     449            0 :             if let Some(last) = idle_backs.iter().max() {
     450            0 :                 last_active = Some(*last);
     451            0 :             }
     452              :         }
     453            0 :         Err(e) => {
     454            0 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     455              :         }
     456              :     }
     457              : 
     458            0 :     Ok(last_active)
     459            0 : }
     460              : 
     461              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     462            0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     463            0 :     let compute = Arc::clone(compute);
     464            0 :     let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
     465            0 :     let now = Utc::now();
     466            0 :     let mut monitor = ComputeMonitor {
     467            0 :         compute,
     468            0 :         last_active: None,
     469            0 :         last_checked: now,
     470            0 :         last_up: now,
     471            0 :         active_time: None,
     472            0 :         sessions: None,
     473            0 :         experimental,
     474            0 :     };
     475            0 : 
     476            0 :     thread::Builder::new()
     477            0 :         .name("compute-monitor".into())
     478            0 :         .spawn(move || {
     479            0 :             let span = span!(Level::INFO, "compute_monitor");
     480            0 :             let _enter = span.enter();
     481            0 :             match monitor.run() {
     482            0 :                 Ok(_) => info!("compute monitor thread terminated gracefully"),
     483            0 :                 Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
     484              :             }
     485            0 :         })
     486            0 :         .expect("cannot launch compute monitor thread")
     487            0 : }
        

Generated by: LCOV version 2.1-beta