Line data Source code
1 : use std::sync::Arc;
2 : use std::{thread, time};
3 :
4 : use chrono::{DateTime, Utc};
5 : use postgres::{Client, NoTls};
6 : use tracing::{debug, info};
7 :
8 : use crate::compute::ComputeNode;
9 :
10 : const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
11 :
12 : // Spin in a loop and figure out the last activity time in the Postgres.
13 : // Then update it in the shared state. This function never errors out.
14 : // XXX: the only expected panic is at `RwLock` unwrap().
15 663 : fn watch_compute_activity(compute: &ComputeNode) {
16 663 : // Suppose that `connstr` doesn't change
17 663 : let connstr = compute.connstr.as_str();
18 663 : // Define `client` outside of the loop to reuse existing connection if it's active.
19 663 : let mut client = Client::connect(connstr, NoTls);
20 663 : let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
21 663 :
22 663 : info!("watching Postgres activity at {}", connstr);
23 :
24 10539 : loop {
25 10539 : // Should be outside of the write lock to allow others to read while we sleep.
26 10539 : thread::sleep(timeout);
27 10539 :
28 10539 : match &mut client {
29 9346 : Ok(cli) => {
30 9346 : if cli.is_closed() {
31 663 : info!("connection to postgres closed, trying to reconnect");
32 :
33 : // Connection is closed, reconnect and try again.
34 0 : client = Client::connect(connstr, NoTls);
35 0 : continue;
36 8683 : }
37 8683 :
38 8683 : // Get all running client backends except ourself, use RFC3339 DateTime format.
39 8683 : let backends = cli
40 8683 : .query(
41 8683 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
42 8683 : FROM pg_stat_activity
43 8683 : WHERE backend_type = 'client backend'
44 8683 : AND pid != pg_backend_pid()
45 8683 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
46 8683 : &[],
47 8683 : );
48 8683 : let mut last_active = compute.state.lock().unwrap().last_active;
49 :
50 8683 : if let Ok(backs) = backends {
51 8648 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
52 :
53 8648 : for b in backs.into_iter() {
54 0 : let state: String = match b.try_get("state") {
55 0 : Ok(state) => state,
56 0 : Err(_) => continue,
57 : };
58 :
59 0 : if state == "idle" {
60 0 : let change: String = match b.try_get("state_change") {
61 0 : Ok(state_change) => state_change,
62 0 : Err(_) => continue,
63 : };
64 0 : let change = DateTime::parse_from_rfc3339(&change);
65 0 : match change {
66 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
67 0 : Err(e) => {
68 0 : info!("cannot parse backend state_change DateTime: {}", e);
69 0 : continue;
70 : }
71 : }
72 : } else {
73 : // Found non-idle backend, so the last activity is NOW.
74 : // Save it and exit the for loop. Also clear the idle backend
75 : // `state_change` timestamps array as it doesn't matter now.
76 0 : last_active = Some(Utc::now());
77 0 : idle_backs.clear();
78 0 : break;
79 : }
80 : }
81 :
82 : // Get idle backend `state_change` with the max timestamp.
83 8648 : if let Some(last) = idle_backs.iter().max() {
84 0 : last_active = Some(*last);
85 8648 : }
86 35 : }
87 :
88 : // Update the last activity in the shared state if we got a more recent one.
89 8683 : let mut state = compute.state.lock().unwrap();
90 8683 : // NB: `Some(<DateTime>)` is always greater than `None`.
91 8683 : if last_active > state.last_active {
92 0 : state.last_active = last_active;
93 0 : debug!("set the last compute activity time to: {:?}", last_active);
94 8683 : }
95 : }
96 1193 : Err(e) => {
97 1193 : debug!("cannot connect to postgres: {}, retrying", e);
98 :
99 : // Establish a new connection and try again.
100 1193 : client = Client::connect(connstr, NoTls);
101 : }
102 : }
103 : }
104 : }
105 :
106 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
107 663 : pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
108 663 : let state = Arc::clone(state);
109 663 :
110 663 : thread::Builder::new()
111 663 : .name("compute-monitor".into())
112 663 : .spawn(move || watch_compute_activity(&state))
113 663 : .expect("cannot launch compute monitor thread")
114 663 : }
|