TLA Line data Source code
1 : use std::sync::Arc;
2 : use std::{thread, time::Duration};
3 :
4 : use chrono::{DateTime, Utc};
5 : use postgres::{Client, NoTls};
6 : use tracing::{debug, info};
7 :
8 : use crate::compute::ComputeNode;
9 :
10 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
11 :
12 : // Spin in a loop and figure out the last activity time in the Postgres.
13 : // Then update it in the shared state. This function never errors out.
14 : // XXX: the only expected panic is at `RwLock` unwrap().
15 CBC 641 : fn watch_compute_activity(compute: &ComputeNode) {
16 641 : // Suppose that `connstr` doesn't change
17 641 : let connstr = compute.connstr.as_str();
18 641 : // Define `client` outside of the loop to reuse existing connection if it's active.
19 641 : let mut client = Client::connect(connstr, NoTls);
20 641 :
21 641 : info!("watching Postgres activity at {}", connstr);
22 :
23 9972 : loop {
24 9972 : // Should be outside of the write lock to allow others to read while we sleep.
25 9972 : thread::sleep(MONITOR_CHECK_INTERVAL);
26 9972 :
27 9972 : match &mut client {
28 8802 : Ok(cli) => {
29 8802 : if cli.is_closed() {
30 641 : info!("connection to postgres closed, trying to reconnect");
31 :
32 : // Connection is closed, reconnect and try again.
33 UBC 0 : client = Client::connect(connstr, NoTls);
34 0 : continue;
35 CBC 8161 : }
36 8161 :
37 8161 : // Get all running client backends except ourself, use RFC3339 DateTime format.
38 8161 : let backends = cli
39 8161 : .query(
40 8161 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
41 8161 : FROM pg_stat_activity
42 8161 : WHERE backend_type = 'client backend'
43 8161 : AND pid != pg_backend_pid()
44 8161 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
45 8161 : &[],
46 8161 : );
47 8161 : let mut last_active = compute.state.lock().unwrap().last_active;
48 :
49 8161 : if let Ok(backs) = backends {
50 8120 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
51 :
52 8120 : for b in backs.into_iter() {
53 UBC 0 : let state: String = match b.try_get("state") {
54 0 : Ok(state) => state,
55 0 : Err(_) => continue,
56 : };
57 :
58 0 : if state == "idle" {
59 0 : let change: String = match b.try_get("state_change") {
60 0 : Ok(state_change) => state_change,
61 0 : Err(_) => continue,
62 : };
63 0 : let change = DateTime::parse_from_rfc3339(&change);
64 0 : match change {
65 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
66 0 : Err(e) => {
67 0 : info!("cannot parse backend state_change DateTime: {}", e);
68 0 : continue;
69 : }
70 : }
71 : } else {
72 : // Found non-idle backend, so the last activity is NOW.
73 : // Save it and exit the for loop. Also clear the idle backend
74 : // `state_change` timestamps array as it doesn't matter now.
75 0 : last_active = Some(Utc::now());
76 0 : idle_backs.clear();
77 0 : break;
78 : }
79 : }
80 :
81 : // Get idle backend `state_change` with the max timestamp.
82 CBC 8120 : if let Some(last) = idle_backs.iter().max() {
83 UBC 0 : last_active = Some(*last);
84 CBC 8120 : }
85 41 : }
86 :
87 : // Update the last activity in the shared state if we got a more recent one.
88 8161 : let mut state = compute.state.lock().unwrap();
89 8161 : // NB: `Some(<DateTime>)` is always greater than `None`.
90 8161 : if last_active > state.last_active {
91 UBC 0 : state.last_active = last_active;
92 0 : debug!("set the last compute activity time to: {:?}", last_active);
93 CBC 8161 : }
94 : }
95 1170 : Err(e) => {
96 1170 : debug!("cannot connect to postgres: {}, retrying", e);
97 :
98 : // Establish a new connection and try again.
99 1170 : client = Client::connect(connstr, NoTls);
100 : }
101 : }
102 : }
103 : }
104 :
105 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
106 641 : pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
107 641 : let state = Arc::clone(state);
108 641 :
109 641 : thread::Builder::new()
110 641 : .name("compute-monitor".into())
111 641 : .spawn(move || watch_compute_activity(&state))
112 641 : .expect("cannot launch compute monitor thread")
113 641 : }
|