LCOV - code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 63.4 % 202 128
Test Date: 2024-02-12 20:26:03 Functions: 55.0 % 20 11

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::{thread, time::Duration};
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use postgres::{Client, NoTls};
       6              : use tracing::{debug, error, info, warn};
       7              : 
       8              : use crate::compute::ComputeNode;
       9              : use compute_api::responses::ComputeStatus;
      10              : use compute_api::spec::ComputeFeature;
      11              : 
      12              : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      13              : 
      14              : // Spin in a loop and figure out the last activity time in the Postgres.
      15              : // Then update it in the shared state. This function never errors out.
      16              : // NB: the only expected panic is at `Mutex` unwrap(), all other errors
      17              : // should be handled gracefully.
      18          572 : fn watch_compute_activity(compute: &ComputeNode) {
      19          572 :     // Suppose that `connstr` doesn't change
      20          572 :     let connstr = compute.connstr.as_str();
      21          572 : 
      22          572 :     // During startup and configuration we connect to every Postgres database,
      23          572 :     // but we don't want to count this as some user activity. So wait until
      24          572 :     // the compute fully started before monitoring activity.
      25          572 :     wait_for_postgres_start(compute);
      26          572 : 
      27          572 :     // Define `client` outside of the loop to reuse existing connection if it's active.
      28          572 :     let mut client = Client::connect(connstr, NoTls);
      29          572 : 
      30          572 :     let mut sleep = false;
      31          572 :     let mut prev_active_time: Option<f64> = None;
      32          572 :     let mut prev_sessions: Option<i64> = None;
      33          572 : 
      34          572 :     if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
      35            0 :         info!("starting experimental activity monitor for {}", connstr);
      36              :     } else {
      37          572 :         info!("starting activity monitor for {}", connstr);
      38              :     }
      39              : 
      40        11844 :     loop {
      41        11844 :         // We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
      42        11844 :         // But skip the first sleep, so we can connect to Postgres immediately.
      43        11844 :         if sleep {
      44        11272 :             // Should be outside of the mutex lock to allow others to read while we sleep.
      45        11272 :             thread::sleep(MONITOR_CHECK_INTERVAL);
      46        11272 :         } else {
      47          572 :             sleep = true;
      48          572 :         }
      49              : 
      50        11844 :         match &mut client {
      51        11844 :             Ok(cli) => {
      52        11844 :                 if cli.is_closed() {
      53            0 :                     info!("connection to Postgres is closed, trying to reconnect");
      54              : 
      55              :                     // Connection is closed, reconnect and try again.
      56            0 :                     client = Client::connect(connstr, NoTls);
      57            0 :                     continue;
      58        11844 :                 }
      59        11844 : 
      60        11844 :                 // This is a new logic, only enable if the feature flag is set.
      61        11844 :                 // TODO: remove this once we are sure that it works OR drop it altogether.
      62        11844 :                 if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
      63              :                     // First, check if the total active time or sessions across all databases has changed.
      64              :                     // If it did, it means that user executed some queries. In theory, it can even go down if
      65              :                     // some databases were dropped, but it's still a user activity.
      66            0 :                     match get_database_stats(cli) {
      67            0 :                         Ok((active_time, sessions)) => {
      68            0 :                             let mut detected_activity = false;
      69            0 : 
      70            0 :                             prev_active_time = match prev_active_time {
      71            0 :                                 Some(prev_active_time) => {
      72            0 :                                     if active_time != prev_active_time {
      73            0 :                                         detected_activity = true;
      74            0 :                                     }
      75            0 :                                     Some(active_time)
      76              :                                 }
      77            0 :                                 None => Some(active_time),
      78              :                             };
      79            0 :                             prev_sessions = match prev_sessions {
      80            0 :                                 Some(prev_sessions) => {
      81            0 :                                     if sessions != prev_sessions {
      82            0 :                                         detected_activity = true;
      83            0 :                                     }
      84            0 :                                     Some(sessions)
      85              :                                 }
      86            0 :                                 None => Some(sessions),
      87              :                             };
      88              : 
      89            0 :                             if detected_activity {
      90              :                                 // Update the last active time and continue, we don't need to
      91              :                                 // check backends state change.
      92            0 :                                 compute.update_last_active(Some(Utc::now()));
      93            0 :                                 continue;
      94            0 :                             }
      95              :                         }
      96            0 :                         Err(e) => {
      97            0 :                             error!("could not get database statistics: {}", e);
      98            0 :                             continue;
      99              :                         }
     100              :                     }
     101        11844 :                 }
     102              : 
     103              :                 // Second, if database statistics is the same, check all backends state change,
     104              :                 // maybe there is some with more recent activity. `get_backends_state_change()`
     105              :                 // can return None or stale timestamp, so it's `compute.update_last_active()`
     106              :                 // responsibility to check if the new timestamp is more recent than the current one.
     107              :                 // This helps us to discover new sessions, that did nothing yet.
     108        11844 :                 match get_backends_state_change(cli) {
     109        11798 :                     Ok(last_active) => {
     110        11798 :                         compute.update_last_active(last_active);
     111        11798 :                     }
     112           46 :                     Err(e) => {
     113           46 :                         error!("could not get backends state change: {}", e);
     114              :                     }
     115              :                 }
     116              : 
     117              :                 // Finally, if there are existing (logical) walsenders, do not suspend.
     118              :                 //
     119              :                 // walproposer doesn't currently show up in pg_stat_replication,
     120              :                 // but protect if it will be
     121        11844 :                 let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
     122        11844 :                 match cli.query_one(ws_count_query, &[]) {
     123        11791 :                     Ok(r) => match r.try_get::<&str, i64>("count") {
     124        11791 :                         Ok(num_ws) => {
     125        11791 :                             if num_ws > 0 {
     126           40 :                                 compute.update_last_active(Some(Utc::now()));
     127           40 :                                 continue;
     128        11751 :                             }
     129              :                         }
     130            0 :                         Err(e) => {
     131            0 :                             warn!("failed to parse walsenders count: {:?}", e);
     132            0 :                             continue;
     133              :                         }
     134              :                     },
     135           53 :                     Err(e) => {
     136           53 :                         warn!("failed to get list of walsenders: {:?}", e);
     137           53 :                         continue;
     138              :                     }
     139              :                 }
     140              :                 //
     141              :                 // Don't suspend compute if there is an active logical replication subscription
     142              :                 //
     143              :                 // `where pid is not null` – to filter out read only computes and subscription on branches
     144              :                 //
     145        11751 :                 let logical_subscriptions_query =
     146        11751 :                     "select count(*) from pg_stat_subscription where pid is not null;";
     147        11751 :                 match cli.query_one(logical_subscriptions_query, &[]) {
     148        11747 :                     Ok(row) => match row.try_get::<&str, i64>("count") {
     149        11747 :                         Ok(num_subscribers) => {
     150        11747 :                             if num_subscribers > 0 {
     151            1 :                                 compute.update_last_active(Some(Utc::now()));
     152            1 :                                 continue;
     153        11746 :                             }
     154              :                         }
     155            0 :                         Err(e) => {
     156            0 :                             warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
     157            0 :                             continue;
     158              :                         }
     159              :                     },
     160            4 :                     Err(e) => {
     161            4 :                         warn!(
     162            4 :                             "failed to get list of active logical replication subscriptions: {:?}",
     163            4 :                             e
     164            4 :                         );
     165            4 :                         continue;
     166              :                     }
     167              :                 }
     168              :                 //
     169              :                 // Do not suspend compute if autovacuum is running
     170              :                 //
     171        11746 :                 let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
     172        11746 :                 match cli.query_one(autovacuum_count_query, &[]) {
     173        11742 :                     Ok(r) => match r.try_get::<&str, i64>("count") {
     174        11742 :                         Ok(num_workers) => {
     175        11742 :                             if num_workers > 0 {
     176          235 :                                 compute.update_last_active(Some(Utc::now()));
     177          235 :                                 continue;
     178        10935 :                             }
     179              :                         }
     180            0 :                         Err(e) => {
     181            0 :                             warn!("failed to parse autovacuum workers count: {:?}", e);
     182            0 :                             continue;
     183              :                         }
     184              :                     },
     185            4 :                     Err(e) => {
     186            4 :                         warn!("failed to get list of autovacuum workers: {:?}", e);
     187            4 :                         continue;
     188              :                     }
     189              :                 }
     190              :             }
     191            0 :             Err(e) => {
     192            0 :                 debug!("could not connect to Postgres: {}, retrying", e);
     193              : 
     194              :                 // Establish a new connection and try again.
     195            0 :                 client = Client::connect(connstr, NoTls);
     196              :             }
     197              :         }
     198              :     }
     199              : }
     200              : 
     201              : // Hang on condition variable waiting until the compute status is `Running`.
     202          572 : fn wait_for_postgres_start(compute: &ComputeNode) {
     203          572 :     let mut state = compute.state.lock().unwrap();
     204          572 :     while state.status != ComputeStatus::Running {
     205          572 :         info!("compute is not running, waiting before monitoring activity");
     206          572 :         state = compute.state_changed.wait(state).unwrap();
     207          572 : 
     208          572 :         if state.status == ComputeStatus::Running {
     209          572 :             break;
     210            0 :         }
     211              :     }
     212          572 : }
     213              : 
     214              : // Figure out the total active time and sessions across all non-system databases.
     215              : // Returned tuple is `(active_time, sessions)`.
     216              : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
     217              : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
     218              : // (or open any sessions) yet.
     219            0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
     220            0 :     // Filter out `postgres` database as `compute_ctl` and other monitoring tools
     221            0 :     // like `postgres_exporter` use it to query Postgres statistics.
     222            0 :     // Use explicit 8 bytes type casts to match Rust types.
     223            0 :     let stats = cli.query_one(
     224            0 :         "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
     225            0 :             coalesce(sum(sessions), 0)::bigint AS total_sessions
     226            0 :         FROM pg_stat_database
     227            0 :         WHERE datname NOT IN (
     228            0 :                 'postgres',
     229            0 :                 'template0',
     230            0 :                 'template1'
     231            0 :             );",
     232            0 :         &[],
     233            0 :     );
     234            0 :     let stats = match stats {
     235            0 :         Ok(stats) => stats,
     236            0 :         Err(e) => {
     237            0 :             return Err(anyhow::anyhow!("could not query active_time: {}", e));
     238              :         }
     239              :     };
     240              : 
     241            0 :     let active_time: f64 = match stats.try_get("total_active_time") {
     242            0 :         Ok(active_time) => active_time,
     243            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
     244              :     };
     245              : 
     246            0 :     let sessions: i64 = match stats.try_get("total_sessions") {
     247            0 :         Ok(sessions) => sessions,
     248            0 :         Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
     249              :     };
     250              : 
     251            0 :     Ok((active_time, sessions))
     252            0 : }
     253              : 
     254              : // Figure out the most recent state change time across all client backends.
     255              : // If there is currently active backend, timestamp will be `Utc::now()`.
     256              : // It can return `None`, which means no client backends exist or we were
     257              : // unable to parse the timestamp.
     258        11273 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
     259        11273 :     let mut last_active: Option<DateTime<Utc>> = None;
     260        11273 :     // Get all running client backends except ourself, use RFC3339 DateTime format.
     261        11273 :     let backends = cli.query(
     262        11273 :         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
     263        11273 :                 FROM pg_stat_activity
     264        11273 :                     WHERE backend_type = 'client backend'
     265        11273 :                     AND pid != pg_backend_pid()
     266        11273 :                     AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
     267        11273 :         &[],
     268        11273 :     );
     269        11273 : 
     270        11273 :     match backends {
     271        11227 :         Ok(backs) => {
     272        11227 :             let mut idle_backs: Vec<DateTime<Utc>> = vec![];
     273              : 
     274        11227 :             for b in backs.into_iter() {
     275            2 :                 let state: String = match b.try_get("state") {
     276            2 :                     Ok(state) => state,
     277            0 :                     Err(_) => continue,
     278              :                 };
     279              : 
     280            2 :                 if state == "idle" {
     281            1 :                     let change: String = match b.try_get("state_change") {
     282            1 :                         Ok(state_change) => state_change,
     283            0 :                         Err(_) => continue,
     284              :                     };
     285            1 :                     let change = DateTime::parse_from_rfc3339(&change);
     286            1 :                     match change {
     287            1 :                         Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
     288            0 :                         Err(e) => {
     289            0 :                             info!("cannot parse backend state_change DateTime: {}", e);
     290            0 :                             continue;
     291              :                         }
     292              :                     }
     293              :                 } else {
     294              :                     // Found non-idle backend, so the last activity is NOW.
     295              :                     // Return immediately, no need to check other backends.
     296            1 :                     return Ok(Some(Utc::now()));
     297              :                 }
     298              :             }
     299              : 
     300              :             // Get idle backend `state_change` with the max timestamp.
     301        11225 :             if let Some(last) = idle_backs.iter().max() {
     302            1 :                 last_active = Some(*last);
     303        11224 :             }
     304              :         }
     305           46 :         Err(e) => {
     306           46 :             return Err(anyhow::anyhow!("could not query backends: {}", e));
     307              :         }
     308              :     }
     309              : 
     310        11225 :     Ok(last_active)
     311        11272 : }
     312              : 
     313              : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     314          572 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     315          572 :     let compute = Arc::clone(compute);
     316          572 : 
     317          572 :     thread::Builder::new()
     318          572 :         .name("compute-monitor".into())
     319          572 :         .spawn(move || watch_compute_activity(&compute))
     320          572 :         .expect("cannot launch compute monitor thread")
     321          572 : }
        

Generated by: LCOV version 2.1-beta