TLA Line data Source code
1 : use std::sync::Arc;
2 : use std::{thread, time::Duration};
3 :
4 : use chrono::{DateTime, Utc};
5 : use postgres::{Client, NoTls};
6 : use tracing::{debug, info, warn};
7 :
8 : use crate::compute::ComputeNode;
9 :
10 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
11 :
12 : // Spin in a loop and figure out the last activity time in the Postgres.
13 : // Then update it in the shared state. This function never errors out.
14 : // XXX: the only expected panic is at `RwLock` unwrap().
15 CBC 544 : fn watch_compute_activity(compute: &ComputeNode) {
16 544 : // Suppose that `connstr` doesn't change
17 544 : let connstr = compute.connstr.as_str();
18 544 : // Define `client` outside of the loop to reuse existing connection if it's active.
19 544 : let mut client = Client::connect(connstr, NoTls);
20 544 :
21 544 : info!("watching Postgres activity at {}", connstr);
22 :
23 13602 : loop {
24 13602 : // Should be outside of the write lock to allow others to read while we sleep.
25 13602 : thread::sleep(MONITOR_CHECK_INTERVAL);
26 13602 :
27 13602 : match &mut client {
28 12584 : Ok(cli) => {
29 12584 : if cli.is_closed() {
30 UBC 0 : info!("connection to postgres closed, trying to reconnect");
31 :
32 : // Connection is closed, reconnect and try again.
33 0 : client = Client::connect(connstr, NoTls);
34 0 : continue;
35 CBC 12584 : }
36 12584 :
37 12584 : // Get all running client backends except ourself, use RFC3339 DateTime format.
38 12584 : let backends = cli
39 12584 : .query(
40 12584 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
41 12584 : FROM pg_stat_activity
42 12584 : WHERE backend_type = 'client backend'
43 12584 : AND pid != pg_backend_pid()
44 12584 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
45 12584 : &[],
46 12584 : );
47 12584 : let mut last_active = compute.state.lock().unwrap().last_active;
48 :
49 12584 : if let Ok(backs) = backends {
50 12550 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
51 :
52 12550 : for b in backs.into_iter() {
53 UBC 0 : let state: String = match b.try_get("state") {
54 0 : Ok(state) => state,
55 0 : Err(_) => continue,
56 : };
57 :
58 0 : if state == "idle" {
59 0 : let change: String = match b.try_get("state_change") {
60 0 : Ok(state_change) => state_change,
61 0 : Err(_) => continue,
62 : };
63 0 : let change = DateTime::parse_from_rfc3339(&change);
64 0 : match change {
65 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
66 0 : Err(e) => {
67 0 : info!("cannot parse backend state_change DateTime: {}", e);
68 0 : continue;
69 : }
70 : }
71 : } else {
72 : // Found non-idle backend, so the last activity is NOW.
73 : // Save it and exit the for loop. Also clear the idle backend
74 : // `state_change` timestamps array as it doesn't matter now.
75 0 : last_active = Some(Utc::now());
76 0 : idle_backs.clear();
77 0 : break;
78 : }
79 : }
80 :
81 : // Get idle backend `state_change` with the max timestamp.
82 CBC 12006 : if let Some(last) = idle_backs.iter().max() {
83 UBC 0 : last_active = Some(*last);
84 CBC 12006 : }
85 34 : }
86 :
87 : // If there are existing (logical) walsenders, do not suspend.
88 : //
89 : // walproposer doesn't currently show up in pg_stat_replication,
90 : // but protect if it will be
91 12040 : let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
92 12040 : match cli.query_one(ws_count_query, &[]) {
93 12003 : Ok(r) => match r.try_get::<&str, i64>("count") {
94 12003 : Ok(num_ws) => {
95 12003 : if num_ws > 0 {
96 46 : last_active = Some(Utc::now());
97 11957 : }
98 : }
99 UBC 0 : Err(e) => {
100 0 : warn!("failed to parse ws count: {:?}", e);
101 0 : continue;
102 : }
103 : },
104 CBC 37 : Err(e) => {
105 37 : warn!("failed to get list of walsenders: {:?}", e);
106 37 : continue;
107 : }
108 : }
109 :
110 : // Update the last activity in the shared state if we got a more recent one.
111 12003 : let mut state = compute.state.lock().unwrap();
112 12003 : // NB: `Some(<DateTime>)` is always greater than `None`.
113 12003 : if last_active > state.last_active {
114 46 : state.last_active = last_active;
115 46 : debug!("set the last compute activity time to: {:?}", last_active);
116 11957 : }
117 : }
118 1018 : Err(e) => {
119 1018 : debug!("cannot connect to postgres: {}, retrying", e);
120 :
121 : // Establish a new connection and try again.
122 1018 : client = Client::connect(connstr, NoTls);
123 : }
124 : }
125 : }
126 : }
127 :
128 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
129 544 : pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
130 544 : let state = Arc::clone(state);
131 544 :
132 544 : thread::Builder::new()
133 544 : .name("compute-monitor".into())
134 544 : .spawn(move || watch_compute_activity(&state))
135 544 : .expect("cannot launch compute monitor thread")
136 544 : }
|