LCOV - differential code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 72.3 % 83 60 23 60
Current Date: 2024-01-09 02:06:09 Functions: 50.0 % 10 5 5 5
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : use std::{thread, time::Duration};
       3                 : 
       4                 : use chrono::{DateTime, Utc};
       5                 : use postgres::{Client, NoTls};
       6                 : use tracing::{debug, info, warn};
       7                 : 
       8                 : use crate::compute::ComputeNode;
       9                 : 
      10                 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      11                 : 
      12                 : // Spin in a loop and figure out the last activity time in the Postgres.
      13                 : // Then update it in the shared state. This function never errors out.
      14                 : // XXX: the only expected panic is at `RwLock` unwrap().
      15 CBC         544 : fn watch_compute_activity(compute: &ComputeNode) {
      16             544 :     // Suppose that `connstr` doesn't change
      17             544 :     let connstr = compute.connstr.as_str();
      18             544 :     // Define `client` outside of the loop to reuse existing connection if it's active.
      19             544 :     let mut client = Client::connect(connstr, NoTls);
      20             544 : 
      21             544 :     info!("watching Postgres activity at {}", connstr);
      22                 : 
      23           13602 :     loop {
      24           13602 :         // Should be outside of the write lock to allow others to read while we sleep.
      25           13602 :         thread::sleep(MONITOR_CHECK_INTERVAL);
      26           13602 : 
      27           13602 :         match &mut client {
      28           12584 :             Ok(cli) => {
      29           12584 :                 if cli.is_closed() {
      30 UBC           0 :                     info!("connection to postgres closed, trying to reconnect");
      31                 : 
      32                 :                     // Connection is closed, reconnect and try again.
      33               0 :                     client = Client::connect(connstr, NoTls);
      34               0 :                     continue;
      35 CBC       12584 :                 }
      36           12584 : 
      37           12584 :                 // Get all running client backends except ourself, use RFC3339 DateTime format.
      38           12584 :                 let backends = cli
      39           12584 :                     .query(
      40           12584 :                         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
      41           12584 :                          FROM pg_stat_activity
      42           12584 :                          WHERE backend_type = 'client backend'
      43           12584 :                             AND pid != pg_backend_pid()
      44           12584 :                             AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
      45           12584 :                         &[],
      46           12584 :                     );
      47           12584 :                 let mut last_active = compute.state.lock().unwrap().last_active;
      48                 : 
      49           12584 :                 if let Ok(backs) = backends {
      50           12550 :                     let mut idle_backs: Vec<DateTime<Utc>> = vec![];
      51                 : 
      52           12550 :                     for b in backs.into_iter() {
      53 UBC           0 :                         let state: String = match b.try_get("state") {
      54               0 :                             Ok(state) => state,
      55               0 :                             Err(_) => continue,
      56                 :                         };
      57                 : 
      58               0 :                         if state == "idle" {
      59               0 :                             let change: String = match b.try_get("state_change") {
      60               0 :                                 Ok(state_change) => state_change,
      61               0 :                                 Err(_) => continue,
      62                 :                             };
      63               0 :                             let change = DateTime::parse_from_rfc3339(&change);
      64               0 :                             match change {
      65               0 :                                 Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
      66               0 :                                 Err(e) => {
      67               0 :                                     info!("cannot parse backend state_change DateTime: {}", e);
      68               0 :                                     continue;
      69                 :                                 }
      70                 :                             }
      71                 :                         } else {
      72                 :                             // Found non-idle backend, so the last activity is NOW.
      73                 :                             // Save it and exit the for loop. Also clear the idle backend
      74                 :                             // `state_change` timestamps array as it doesn't matter now.
      75               0 :                             last_active = Some(Utc::now());
      76               0 :                             idle_backs.clear();
      77               0 :                             break;
      78                 :                         }
      79                 :                     }
      80                 : 
      81                 :                     // Get idle backend `state_change` with the max timestamp.
      82 CBC       12006 :                     if let Some(last) = idle_backs.iter().max() {
      83 UBC           0 :                         last_active = Some(*last);
      84 CBC       12006 :                     }
      85              34 :                 }
      86                 : 
      87                 :                 // If there are existing (logical) walsenders, do not suspend.
      88                 :                 //
      89                 :                 // walproposer doesn't currently show up in pg_stat_replication,
      90                 :                 // but protect if it will be
      91           12040 :                 let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
      92           12040 :                 match cli.query_one(ws_count_query, &[]) {
      93           12003 :                     Ok(r) => match r.try_get::<&str, i64>("count") {
      94           12003 :                         Ok(num_ws) => {
      95           12003 :                             if num_ws > 0 {
      96              46 :                                 last_active = Some(Utc::now());
      97           11957 :                             }
      98                 :                         }
      99 UBC           0 :                         Err(e) => {
     100               0 :                             warn!("failed to parse ws count: {:?}", e);
     101               0 :                             continue;
     102                 :                         }
     103                 :                     },
     104 CBC          37 :                     Err(e) => {
     105              37 :                         warn!("failed to get list of walsenders: {:?}", e);
     106              37 :                         continue;
     107                 :                     }
     108                 :                 }
     109                 : 
     110                 :                 // Update the last activity in the shared state if we got a more recent one.
     111           12003 :                 let mut state = compute.state.lock().unwrap();
     112           12003 :                 // NB: `Some(<DateTime>)` is always greater than `None`.
     113           12003 :                 if last_active > state.last_active {
     114              46 :                     state.last_active = last_active;
     115              46 :                     debug!("set the last compute activity time to: {:?}", last_active);
     116           11957 :                 }
     117                 :             }
     118            1018 :             Err(e) => {
     119            1018 :                 debug!("cannot connect to postgres: {}, retrying", e);
     120                 : 
     121                 :                 // Establish a new connection and try again.
     122            1018 :                 client = Client::connect(connstr, NoTls);
     123                 :             }
     124                 :         }
     125                 :     }
     126                 : }
     127                 : 
     128                 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     129             544 : pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     130             544 :     let state = Arc::clone(state);
     131             544 : 
     132             544 :     thread::Builder::new()
     133             544 :         .name("compute-monitor".into())
     134             544 :         .spawn(move || watch_compute_activity(&state))
     135             544 :         .expect("cannot launch compute monitor thread")
     136             544 : }
        

Generated by: LCOV version 2.1-beta