LCOV - differential code coverage report
Current view: top level - compute_tools/src - monitor.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 70.0 % 70 49 21 49
Current Date: 2023-10-19 02:04:12 Functions: 50.0 % 8 4 4 4
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : use std::{thread, time::Duration};
       3                 : 
       4                 : use chrono::{DateTime, Utc};
       5                 : use postgres::{Client, NoTls};
       6                 : use tracing::{debug, info};
       7                 : 
       8                 : use crate::compute::ComputeNode;
       9                 : 
      10                 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
      11                 : 
      12                 : // Spin in a loop and figure out the last activity time in the Postgres.
      13                 : // Then update it in the shared state. This function never errors out.
      14                 : // XXX: the only expected panic is at `RwLock` unwrap().
      15 CBC         641 : fn watch_compute_activity(compute: &ComputeNode) {
      16             641 :     // Suppose that `connstr` doesn't change
      17             641 :     let connstr = compute.connstr.as_str();
      18             641 :     // Define `client` outside of the loop to reuse existing connection if it's active.
      19             641 :     let mut client = Client::connect(connstr, NoTls);
      20             641 : 
      21             641 :     info!("watching Postgres activity at {}", connstr);
      22                 : 
      23            9972 :     loop {
      24            9972 :         // Should be outside of the write lock to allow others to read while we sleep.
      25            9972 :         thread::sleep(MONITOR_CHECK_INTERVAL);
      26            9972 : 
      27            9972 :         match &mut client {
      28            8802 :             Ok(cli) => {
      29            8802 :                 if cli.is_closed() {
      30             641 :                     info!("connection to postgres closed, trying to reconnect");
      31                 : 
      32                 :                     // Connection is closed, reconnect and try again.
      33 UBC           0 :                     client = Client::connect(connstr, NoTls);
      34               0 :                     continue;
      35 CBC        8161 :                 }
      36            8161 : 
      37            8161 :                 // Get all running client backends except ourself, use RFC3339 DateTime format.
      38            8161 :                 let backends = cli
      39            8161 :                     .query(
      40            8161 :                         "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
      41            8161 :                          FROM pg_stat_activity
      42            8161 :                          WHERE backend_type = 'client backend'
      43            8161 :                             AND pid != pg_backend_pid()
      44            8161 :                             AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
      45            8161 :                         &[],
      46            8161 :                     );
      47            8161 :                 let mut last_active = compute.state.lock().unwrap().last_active;
      48                 : 
      49            8161 :                 if let Ok(backs) = backends {
      50            8120 :                     let mut idle_backs: Vec<DateTime<Utc>> = vec![];
      51                 : 
      52            8120 :                     for b in backs.into_iter() {
      53 UBC           0 :                         let state: String = match b.try_get("state") {
      54               0 :                             Ok(state) => state,
      55               0 :                             Err(_) => continue,
      56                 :                         };
      57                 : 
      58               0 :                         if state == "idle" {
      59               0 :                             let change: String = match b.try_get("state_change") {
      60               0 :                                 Ok(state_change) => state_change,
      61               0 :                                 Err(_) => continue,
      62                 :                             };
      63               0 :                             let change = DateTime::parse_from_rfc3339(&change);
      64               0 :                             match change {
      65               0 :                                 Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
      66               0 :                                 Err(e) => {
      67               0 :                                     info!("cannot parse backend state_change DateTime: {}", e);
      68               0 :                                     continue;
      69                 :                                 }
      70                 :                             }
      71                 :                         } else {
      72                 :                             // Found non-idle backend, so the last activity is NOW.
      73                 :                             // Save it and exit the for loop. Also clear the idle backend
      74                 :                             // `state_change` timestamps array as it doesn't matter now.
      75               0 :                             last_active = Some(Utc::now());
      76               0 :                             idle_backs.clear();
      77               0 :                             break;
      78                 :                         }
      79                 :                     }
      80                 : 
      81                 :                     // Get idle backend `state_change` with the max timestamp.
      82 CBC        8120 :                     if let Some(last) = idle_backs.iter().max() {
      83 UBC           0 :                         last_active = Some(*last);
      84 CBC        8120 :                     }
      85              41 :                 }
      86                 : 
      87                 :                 // Update the last activity in the shared state if we got a more recent one.
      88            8161 :                 let mut state = compute.state.lock().unwrap();
      89            8161 :                 // NB: `Some(<DateTime>)` is always greater than `None`.
      90            8161 :                 if last_active > state.last_active {
      91 UBC           0 :                     state.last_active = last_active;
      92               0 :                     debug!("set the last compute activity time to: {:?}", last_active);
      93 CBC        8161 :                 }
      94                 :             }
      95            1170 :             Err(e) => {
      96            1170 :                 debug!("cannot connect to postgres: {}, retrying", e);
      97                 : 
      98                 :                 // Establish a new connection and try again.
      99            1170 :                 client = Client::connect(connstr, NoTls);
     100                 :             }
     101                 :         }
     102                 :     }
     103                 : }
     104                 : 
     105                 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
     106             641 : pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
     107             641 :     let state = Arc::clone(state);
     108             641 : 
     109             641 :     thread::Builder::new()
     110             641 :         .name("compute-monitor".into())
     111             641 :         .spawn(move || watch_compute_activity(&state))
     112             641 :         .expect("cannot launch compute monitor thread")
     113             641 : }
        

Generated by: LCOV version 2.1-beta