LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 142 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::thread;
       3              : use std::time::Duration;
       4              : 
       5              : use chrono::{DateTime, Utc};
       6              : use compute_api::responses::ComputeStatus;
       7              : use compute_api::spec::ComputeFeature;
       8              : use postgres::{Client, NoTls};
       9              : use tracing::{Level, error, info, instrument, span};
      10              : 
      11              : use crate::compute::ComputeNode;
      12              : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
      13              : 
      14              : const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
      15              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      16              : 
      17              : /// Struct to store runtime state of the compute monitor thread.
      18              : /// In theory, this could be a part of `Compute`, but i)
      19              : /// this state is expected to be accessed only by single thread,
      20              : /// so we don't need to care about locking; ii) `Compute` is
      21              : /// already quite big. Thus, it seems to be a good idea to keep
      22              : /// all the activity/health monitoring parts here.
      23              : struct ComputeMonitor {
      24              :     compute: Arc<ComputeNode>,
      25              : 
      26              :     /// The moment when Postgres had some activity,
      27              :     /// that should prevent compute from being suspended.
      28              :     last_active: Option<DateTime<Utc>>,
      29              : 
      30              :     /// The moment when we last tried to check Postgres.
      31              :     last_checked: DateTime<Utc>,
      32              :     /// The last moment we did a successful Postgres check.
      33              :     last_up: DateTime<Utc>,
      34              : 
      35              :     /// Only used for internal statistics change tracking
      36              :     /// between monitor runs and can be outdated.
      37              :     active_time: Option<f64>,
      38              :     /// Only used for internal statistics change tracking
      39              :     /// between monitor runs and can be outdated.
      40              :     sessions: Option<i64>,
      41              : 
      42              :     /// Use experimental statistics-based activity monitor. It's no longer
      43              :     /// 'experimental' per se, as it's enabled for everyone, but we still
      44              :     /// keep the flag as an option to turn it off in some cases if it will
      45              :     /// misbehave.
      46              :     experimental: bool,
      47              : }
      48              : 
      49              : impl ComputeMonitor {
      50            0 :     fn report_down(&self) {
      51            0 :         let now = Utc::now();
      52              : 
      53              :         // Calculate and report current downtime
      54              :         // (since the last time Postgres was up)
      55            0 :         let downtime = now.signed_duration_since(self.last_up);
      56            0 :         PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
      57              : 
      58              :         // Calculate and update total downtime
      59              :         // (cumulative duration of Postgres downtime in ms)
      60            0 :         let inc = now
      61            0 :             .signed_duration_since(self.last_checked)
      62            0 :             .num_milliseconds();
      63            0 :         PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
      64            0 :     }
      65              : 
      66            0 :     fn report_up(&mut self) {
      67            0 :         self.last_up = Utc::now();
      68            0 :         PG_CURR_DOWNTIME_MS.set(0.0);
      69            0 :     }
      70              : 
      71            0 :     fn downtime_info(&self) -> String {
      72            0 :         format!(
      73            0 :             "total_ms: {}, current_ms: {}, last_up: {}",
      74            0 :             PG_TOTAL_DOWNTIME_MS.get(),
      75            0 :             PG_CURR_DOWNTIME_MS.get(),
      76              :             self.last_up
      77              :         )
      78            0 :     }
      79              : 
      80              :     /// Check if compute is in some terminal or soon-to-be-terminal
      81              :     /// state, then return `true`, signalling the caller that it
      82              :     /// should exit gracefully. Otherwise, return `false`.
      83            0 :     fn check_interrupts(&mut self) -> bool {
      84            0 :         let compute_status = self.compute.get_status();
      85            0 :         if matches!(
      86            0 :             compute_status,
      87              :             ComputeStatus::Terminated
      88              :                 | ComputeStatus::TerminationPendingFast
      89              :                 | ComputeStatus::TerminationPendingImmediate
      90              :                 | ComputeStatus::Failed
      91              :         ) {
      92            0 :             info!(
      93            0 :                 "compute is in {} status, stopping compute monitor",
      94              :                 compute_status
      95              :             );
      96            0 :             return true;
      97            0 :         }
      98              : 
      99            0 :         false
     100            0 :     }
     101              : 
     102              :     /// Spin in a loop and figure out the last activity time in the Postgres.
     103              :     /// Then update it in the shared state. This function currently never
     104              :     /// errors out explicitly, but there is a graceful termination path.
     105              :     /// Every time we receive an error trying to check Postgres, we use
     106              :     /// [`ComputeMonitor::check_interrupts()`] because it could be that
     107              :     /// compute is being terminated already, then we can exit gracefully
     108              :     /// to not produce errors' noise in the log.
     109              :     /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
     110              :     /// should be handled gracefully.
     111              :     #[instrument(skip_all)]
     112              :     pub fn run(&mut self) -> anyhow::Result<()> {
     113              :         // Suppose that `connstr` doesn't change
     114              :         let connstr = self.compute.params.connstr.clone();
     115              :         let conf = self
     116              :             .compute
     117              :             .get_conn_conf(Some("compute_ctl:compute_monitor"));
     118              : 
     119              :         // During startup and configuration we connect to every Postgres database,
     120              :         // but we don't want to count this as some user activity. So wait until
     121              :         // the compute fully started before monitoring activity.
     122              :         wait_for_postgres_start(&self.compute);
     123              : 
     124              :         // Define `client` outside of the loop to reuse existing connection if it's active.
     125              :         let mut client = conf.connect(NoTls);
     126              : 
     127              :         info!("starting compute monitor for {}", connstr);
     128              : 
     129              :         loop {
     130              :             if self.check_interrupts() {
     131              :                 break;
     132              :             }
     133              : 
     134              :             match &mut client {
     135              :                 Ok(cli) => {
     136              :                     if cli.is_closed() {
     137              :                         info!(
     138              :                             downtime_info = self.downtime_info(),
     139              :                             "connection to Postgres is closed, trying to reconnect"
     140              :                         );
     141              :                         if self.check_interrupts() {
     142              :                             break;
     143              :                         }
     144              : 
     145              :                         self.report_down();
     146              : 
     147              :                         // Connection is closed, reconnect and try again.
     148              :                         client = conf.connect(NoTls);
     149              :                     } else {
     150              :                         match self.check(cli) {
     151              :                             Ok(_) => {
     152              :                                 self.report_up();
     153              :                                 self.compute.update_last_active(self.last_active);
     154              :                             }
     155              :                             Err(e) => {
     156              :                                 error!(
     157              :                                     downtime_info = self.downtime_info(),
     158              :                                     "could not check Postgres: {}", e
     159              :                                 );
     160              :                                 if self.check_interrupts() {
     161              :                                     break;
     162              :                                 }
     163              : 
     164              :                                 // Although we have many places where we can return errors in `check()`,
     165              :                                 // normally it shouldn't happen. I.e., we will likely return error if
     166              :                                 // connection got broken, query timed out, Postgres returned invalid data, etc.
     167              :                                 // In all such cases it's suspicious, so let's report this as downtime.
     168              :                                 self.report_down();
     169              : 
     170              :                                 // Reconnect to Postgres just in case. During tests, I noticed
     171              :                                 // that queries in `check()` can fail with `connection closed`,
     172              :                                 // but `cli.is_closed()` above doesn't detect it. Even if old
     173              :                                 // connection is still alive, it will be dropped when we reassign
     174              :                                 // `client` to a new connection.
     175              :                                 client = conf.connect(NoTls);
     176              :                             }
     177              :                         }
     178              :                     }
     179              :                 }
     180              :                 Err(e) => {
     181              :                     info!(
     182              :                         downtime_info = self.downtime_info(),
     183              :                         "could not connect to Postgres: {}, retrying", e
     184              :                     );
     185              :                     if self.check_interrupts() {
     186              :                         break;
     187              :                     }
     188              : 
     189              :                     self.report_down();
     190              : 
     191              :                     // Establish a new connection and try again.
     192              :                     client = conf.connect(NoTls);
     193              :                 }
     194              :             }
     195              : 
     196              :             // Reset the `last_checked` timestamp and sleep before the next iteration.
     197              :             self.last_checked = Utc::now();
     198              :             thread::sleep(MONITOR_CHECK_INTERVAL);
     199              :         }
     200              : 
     201              :         // Graceful termination path
     202              :         Ok(())
     203              :     }
     204              : 
     205              :     #[instrument(skip_all)]
     206              :     fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
     207              :         // This is new logic, only enable if the feature flag is set.
     208              :         // TODO: remove this once we are sure that it works OR drop it altogether.
     209              :         if self.experimental {
     210              :             // Check if the total active time or sessions across all databases has changed.
     211              :             // If it did, it means that user executed some queries. In theory, it can even go down if
     212              :             // some databases were dropped, but it's still user activity.
     213              :             match get_database_stats(cli) {
     214              :                 Ok((active_time, sessions)) => {
     215              :                     let mut detected_activity = false;
     216              : 
     217              :                     if let Some(prev_active_time) = self.active_time {
     218              :                         if active_time != prev_active_time {
     219              :                             detected_activity = true;
     220              :                         }
     221              :                     }
     222              :                     self.active_time = Some(active_time);
     223              : 
     224              :                     if let Some(prev_sessions) = self.sessions {
     225              :                         if sessions != prev_sessions {
     226              :                             detected_activity = true;
     227              :                         }
     228              :                     }
     229              :                     self.sessions = Some(sessions);
     230              : 
     231              :                     if detected_activity {
     232              :                         // Update the last active time and continue, we don't need to
     233              :                         // check backends state change.
     234              :                         self.last_active = Some(Utc::now());
     235              :                         return Ok(());
     236              :                     }
     237              :                 }
     238              :                 Err(e) => {
     239              :                     return Err(anyhow::anyhow!("could not get database statistics: {}", e));
     240              :                 }
     241              :             }
     242              :         }
     243              : 
     244              :         // If database statistics are the same, check all backends for state changes.
     245              :         // Maybe there are some with more recent activity. `get_backends_state_change()`
     246              :         // can return None or stale timestamp, so it's `compute.update_last_active()`
     247              :         // responsibility to check if the new timestamp is more recent than the current one.
     248              :         // This helps us to discover new sessions that have not done anything yet.
     249              :         match get_backends_state_change(cli) {
     250              :             Ok(last_active) => match (last_active, self.last_active) {
     251              :                 (Some(last_active), Some(prev_last_active)) => {
     252              :                     if last_active > prev_last_active {
     253              :                         self.last_active = Some(last_active);
     254              :                         return Ok(());
     255              :                     }
     256              :                 }
     257              :                 (Some(last_active), None) => {
     258              :                     self.last_active = Some(last_active);
     259              :                     return Ok(());
     260              :                 }
     261              :                 _ => {}
     262              :             },
     263              :             Err(e) => {
     264              :                 return Err(anyhow::anyhow!(
     265              :                     "could not get backends state change: {}",
     266              :                     e
     267              :                 ));
     268              :             }
     269              :         }
     270              : 
     271              :         // If there are existing (logical) walsenders, do not suspend.
     272              :         //
     273              :         // N.B. walproposer doesn't currently show up in pg_stat_replication,
     274              :         // but protect if it will.
     275              :         const WS_COUNT_QUERY: &str =
     276              :             "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     277              :         match cli.query_one(WS_COUNT_QUERY, &[]) {
     278              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     279              :                 Ok(num_ws) => {
     280              :                     if num_ws > 0 {
     281              :                         self.last_active = Some(Utc::now());
     282              :                         return Ok(());
     283              :                     }
     284              :                 }
     285              :                 Err(e) => {
     286              :                     let err: anyhow::Error = e.into();
     287              :                     return Err(err.context("failed to parse walsenders count"));
     288              :                 }
     289              :             },
     290              :             Err(e) => {
     291              :                 return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
     292              :             }
     293              :         }
     294              : 
     295              :         // Don't suspend compute if there is an active logical replication subscription
     296              :         //
     297              :         // `where pid is not null` – to filter out read only computes and subscription on branches
     298              :         const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
     299              :             "select count(*) from pg_stat_subscription where pid is not null;";
     300              :         match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
     301              :             Ok(row) => match row.try_get::<&str, i64>("count") {
     302              :                 Ok(num_subscribers) => {
     303              :                     if num_subscribers > 0 {
     304              :                         self.last_active = Some(Utc::now());
     305              :                         return Ok(());
     306              :                     }
     307              :                 }
     308              :                 Err(e) => {
     309              :                     return Err(anyhow::anyhow!(
     310              :                         "failed to parse 'pg_stat_subscription' count: {}",
     311              :                         e
     312              :                     ));
     313              :                 }
     314              :             },
     315              :             Err(e) => {
     316              :                 return Err(anyhow::anyhow!(
     317              :                     "failed to get list of active logical replication subscriptions: {}",
     318              :                     e
     319              :                 ));
     320              :             }
     321              :         }
     322              : 
     323              :         // Do not suspend compute if autovacuum is running
     324              :         const AUTOVACUUM_COUNT_QUERY: &str =
     325              :             "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     326              :         match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
     327              :             Ok(r) => match r.try_get::<&str, i64>("count") {
     328              :                 Ok(num_workers) => {
     329              :                     if num_workers > 0 {
     330              :                         self.last_active = Some(Utc::now());
     331              :                         return Ok(());
     332              :                     };
     333              :                 }
     334              :                 Err(e) => {
     335              :                     return Err(anyhow::anyhow!(
     336              :                         "failed to parse autovacuum workers count: {}",
     337              :                         e
     338              :                     ));
     339              :                 }
     340              :             },
     341              :             Err(e) => {
     342              :                 return Err(anyhow::anyhow!(
     343              :                     "failed to get list of autovacuum workers: {}",
     344              :                     e
     345              :                 ));
     346              :             }
     347              :         }
     348              : 
     349              :         Ok(())
     350              :     }
     351              : }
     352              : 
     353              : // Hang on condition variable waiting until the compute status is `Running`.
     354            0 : fn wait_for_postgres_start(compute: &ComputeNode) {
     355            0 :     let mut state = compute.state.lock().unwrap();
     356            0 :     let pg_init_timeout = compute
     357            0 :         .params
     358            0 :         .pg_init_timeout
     359            0 :         .unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
     360              : 
     361            0 :     while state.status != ComputeStatus::Running {
     362            0 :         info!("compute is not running, waiting before monitoring activity");
     363            0 :         if !compute.params.lakebase_mode {
     364            0 :             state = compute.state_changed.wait(state).unwrap();
     365              : 
     366            0 :             if state.status == ComputeStatus::Running {
     367            0 :                 break;
     368            0 :             }
     369            0 :             continue;
     370            0 :         }
     371              : 
     372            0 :         if state.pg_start_time.is_some()
     373            0 :             && Utc::now()
     374            0 :                 .signed_duration_since(state.pg_start_time.unwrap())
     375            0 :                 .to_std()
     376            0 :                 .unwrap_or_default()
     377            0 :                 > pg_init_timeout
     378              :         {
     379              :             // If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
     380              :             // possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
     381              :             // deadends we simply exit (panic) the compute node so it can restart with the latest spec.
     382              :             //
     383              :             // NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
     384              :             // This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
     385              :             // to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
     386            0 :             error!(
     387            0 :                 "compute did not enter Running state in {} seconds, exiting",
     388            0 :                 pg_init_timeout.as_secs()
     389              :             );
     390            0 :             std::process::exit(1);
     391            0 :         }
     392            0 :         state = compute
     393            0 :             .state_changed
     394            0 :             .wait_timeout(state, Duration::from_secs(5))
     395            0 :             .unwrap()
     396            0 :             .0;
     397              :     }
     398            0 : }
     399              : 
     400              : // Figure out the total active time and sessions across all non-system databases.
     401              : // Returned tuple is `(active_time, sessions)`.
     402              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     403              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     404              : // (or open any sessions) yet.
     405            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     406              :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     407              :     // like `postgres_exporter` use it to query Postgres statistics.
     408              :     // Use explicit 8 bytes type casts to match Rust types.
     409            0 :     let stats = cli.query_one(
     410            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     411            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     412            0 :         FROM pg_stat_database
     413            0 :         WHERE datname NOT IN (
     414            0 :                 'postgres',
     415            0 :                 'template0',
     416            0 :                 'template1'
     417            0 :             );",
     418            0 :         &[],
     419              :     );
     420            0 :     let stats = match stats {
     421            0 :         Ok(stats) => stats,
     422            0 :         Err(e) => {
     423            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     424              :         }
     425              :     };
     426              : 
     427            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     428            0 :         Ok(active_time) => active_time,
     429            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     430              :     };
     431              : 
     432            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     433            0 :         Ok(sessions) => sessions,
     434            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     435              :     };
     436              : 
     437            0 :     Ok((active_time, sessions))
     438            0 : }
     439              : 
     440              : // Figure out the most recent state change time across all client backends.
     441              : // If there is currently active backend, timestamp will be `Utc::now()`.
     442              : // It can return `None`, which means no client backends exist or we were
     443              : // unable to parse the timestamp.
     444            0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     445            0 :     let mut last_active: Option<DateTime<Utc>> = None;
     446              :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     447            0 :     let backends = cli.query(
     448            0 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     449            0 :                 FROM pg_stat_activity
     450            0 :                     WHERE backend_type = 'client backend'
     451            0 :                     AND pid != pg_backend_pid()
     452            0 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     453            0 :         &[],
     454              :     );
     455              : 
     456            0 :     match backends {
     457            0 :         Ok(backs) => {
     458            0 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     459              : 
     460            0 :             for b in backs.into_iter() {
     461            0 :                 let state: String = match b.try_get("state") {
     462            0 :                     Ok(state) => state,
     463            0 :                     Err(_) => continue,
     464              :                 };
     465              : 
     466            0 :                 if state == "idle" {
     467            0 :                     let change: String = match b.try_get("state_change") {
     468            0 :                         Ok(state_change) => state_change,
     469            0 :                         Err(_) => continue,
     470              :                     };
     471            0 :                     let change = DateTime::parse_from_rfc3339(&change);
     472            0 :                     match change {
     473            0 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     474            0 :                         Err(e) => {
     475            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     476            0 :                             continue;
     477              :                         }
     478              :                     }
     479              :                 } else {
     480              :                     // Found non-idle backend, so the last activity is NOW.
     481              :                     // Return immediately, no need to check other backends.
     482            0 :                     return Ok(Some(Utc::now()));
     483              :                 }
     484              :             }
     485              : 
     486              :             // Get idle backend `state_change` with the max timestamp.
     487            0 :             if let Some(last) = idle_backs.iter().max() {
     488            0 :                 last_active = Some(*last);
     489            0 :             }
     490              :         }
     491            0 :         Err(e) => {
     492            0 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     493              :         }
     494              :     }
     495              : 
     496            0 :     Ok(last_active)
     497            0 : }
     498              : 
     499              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     500            0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     501            0 :     let compute = Arc::clone(compute);
     502            0 :     let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
     503            0 :     let now = Utc::now();
     504            0 :     let mut monitor = ComputeMonitor {
     505            0 :         compute,
     506            0 :         last_active: None,
     507            0 :         last_checked: now,
     508            0 :         last_up: now,
     509            0 :         active_time: None,
     510            0 :         sessions: None,
     511            0 :         experimental,
     512            0 :     };
     513              : 
     514            0 :     thread::Builder::new()
     515            0 :         .name("compute-monitor".into())
     516            0 :         .spawn(move || {
     517            0 :             let span = span!(Level::INFO, "compute_monitor");
     518            0 :             let _enter = span.enter();
     519            0 :             match monitor.run() {
     520            0 :                 Ok(_) => info!("compute monitor thread terminated gracefully"),
     521            0 :                 Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
     522              :             }
     523            0 :         })
     524            0 :         .expect("cannot launch compute monitor thread")
     525            0 : }
        

Generated by: LCOV version 2.1-beta