Line data Source code
1 : use std::sync::Arc;
2 : use std::{thread, time::Duration};
3 :
4 : use chrono::{DateTime, Utc};
5 : use postgres::{Client, NoTls};
6 : use tracing::{debug, error, info, warn};
7 :
8 : use crate::compute::ComputeNode;
9 : use compute_api::responses::ComputeStatus;
10 : use compute_api::spec::ComputeFeature;
11 :
12 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
13 :
14 : // Spin in a loop and figure out the last activity time in the Postgres.
15 : // Then update it in the shared state. This function never errors out.
16 : // NB: the only expected panic is at `Mutex` unwrap(), all other errors
17 : // should be handled gracefully.
18 572 : fn watch_compute_activity(compute: &ComputeNode) {
19 572 : // Suppose that `connstr` doesn't change
20 572 : let connstr = compute.connstr.as_str();
21 572 :
22 572 : // During startup and configuration we connect to every Postgres database,
23 572 : // but we don't want to count this as some user activity. So wait until
24 572 : // the compute fully started before monitoring activity.
25 572 : wait_for_postgres_start(compute);
26 572 :
27 572 : // Define `client` outside of the loop to reuse existing connection if it's active.
28 572 : let mut client = Client::connect(connstr, NoTls);
29 572 :
30 572 : let mut sleep = false;
31 572 : let mut prev_active_time: Option<f64> = None;
32 572 : let mut prev_sessions: Option<i64> = None;
33 572 :
34 572 : if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
35 0 : info!("starting experimental activity monitor for {}", connstr);
36 : } else {
37 572 : info!("starting activity monitor for {}", connstr);
38 : }
39 :
40 11844 : loop {
41 11844 : // We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
42 11844 : // But skip the first sleep, so we can connect to Postgres immediately.
43 11844 : if sleep {
44 11272 : // Should be outside of the mutex lock to allow others to read while we sleep.
45 11272 : thread::sleep(MONITOR_CHECK_INTERVAL);
46 11272 : } else {
47 572 : sleep = true;
48 572 : }
49 :
50 11844 : match &mut client {
51 11844 : Ok(cli) => {
52 11844 : if cli.is_closed() {
53 0 : info!("connection to Postgres is closed, trying to reconnect");
54 :
55 : // Connection is closed, reconnect and try again.
56 0 : client = Client::connect(connstr, NoTls);
57 0 : continue;
58 11844 : }
59 11844 :
60 11844 : // This is a new logic, only enable if the feature flag is set.
61 11844 : // TODO: remove this once we are sure that it works OR drop it altogether.
62 11844 : if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
63 : // First, check if the total active time or sessions across all databases has changed.
64 : // If it did, it means that user executed some queries. In theory, it can even go down if
65 : // some databases were dropped, but it's still a user activity.
66 0 : match get_database_stats(cli) {
67 0 : Ok((active_time, sessions)) => {
68 0 : let mut detected_activity = false;
69 0 :
70 0 : prev_active_time = match prev_active_time {
71 0 : Some(prev_active_time) => {
72 0 : if active_time != prev_active_time {
73 0 : detected_activity = true;
74 0 : }
75 0 : Some(active_time)
76 : }
77 0 : None => Some(active_time),
78 : };
79 0 : prev_sessions = match prev_sessions {
80 0 : Some(prev_sessions) => {
81 0 : if sessions != prev_sessions {
82 0 : detected_activity = true;
83 0 : }
84 0 : Some(sessions)
85 : }
86 0 : None => Some(sessions),
87 : };
88 :
89 0 : if detected_activity {
90 : // Update the last active time and continue, we don't need to
91 : // check backends state change.
92 0 : compute.update_last_active(Some(Utc::now()));
93 0 : continue;
94 0 : }
95 : }
96 0 : Err(e) => {
97 0 : error!("could not get database statistics: {}", e);
98 0 : continue;
99 : }
100 : }
101 11844 : }
102 :
103 : // Second, if database statistics is the same, check all backends state change,
104 : // maybe there is some with more recent activity. `get_backends_state_change()`
105 : // can return None or stale timestamp, so it's `compute.update_last_active()`
106 : // responsibility to check if the new timestamp is more recent than the current one.
107 : // This helps us to discover new sessions, that did nothing yet.
108 11844 : match get_backends_state_change(cli) {
109 11798 : Ok(last_active) => {
110 11798 : compute.update_last_active(last_active);
111 11798 : }
112 46 : Err(e) => {
113 46 : error!("could not get backends state change: {}", e);
114 : }
115 : }
116 :
117 : // Finally, if there are existing (logical) walsenders, do not suspend.
118 : //
119 : // walproposer doesn't currently show up in pg_stat_replication,
120 : // but protect if it will be
121 11844 : let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
122 11844 : match cli.query_one(ws_count_query, &[]) {
123 11791 : Ok(r) => match r.try_get::<&str, i64>("count") {
124 11791 : Ok(num_ws) => {
125 11791 : if num_ws > 0 {
126 40 : compute.update_last_active(Some(Utc::now()));
127 40 : continue;
128 11751 : }
129 : }
130 0 : Err(e) => {
131 0 : warn!("failed to parse walsenders count: {:?}", e);
132 0 : continue;
133 : }
134 : },
135 53 : Err(e) => {
136 53 : warn!("failed to get list of walsenders: {:?}", e);
137 53 : continue;
138 : }
139 : }
140 : //
141 : // Don't suspend compute if there is an active logical replication subscription
142 : //
143 : // `where pid is not null` – to filter out read only computes and subscription on branches
144 : //
145 11751 : let logical_subscriptions_query =
146 11751 : "select count(*) from pg_stat_subscription where pid is not null;";
147 11751 : match cli.query_one(logical_subscriptions_query, &[]) {
148 11747 : Ok(row) => match row.try_get::<&str, i64>("count") {
149 11747 : Ok(num_subscribers) => {
150 11747 : if num_subscribers > 0 {
151 1 : compute.update_last_active(Some(Utc::now()));
152 1 : continue;
153 11746 : }
154 : }
155 0 : Err(e) => {
156 0 : warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
157 0 : continue;
158 : }
159 : },
160 4 : Err(e) => {
161 4 : warn!(
162 4 : "failed to get list of active logical replication subscriptions: {:?}",
163 4 : e
164 4 : );
165 4 : continue;
166 : }
167 : }
168 : //
169 : // Do not suspend compute if autovacuum is running
170 : //
171 11746 : let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
172 11746 : match cli.query_one(autovacuum_count_query, &[]) {
173 11742 : Ok(r) => match r.try_get::<&str, i64>("count") {
174 11742 : Ok(num_workers) => {
175 11742 : if num_workers > 0 {
176 235 : compute.update_last_active(Some(Utc::now()));
177 235 : continue;
178 10935 : }
179 : }
180 0 : Err(e) => {
181 0 : warn!("failed to parse autovacuum workers count: {:?}", e);
182 0 : continue;
183 : }
184 : },
185 4 : Err(e) => {
186 4 : warn!("failed to get list of autovacuum workers: {:?}", e);
187 4 : continue;
188 : }
189 : }
190 : }
191 0 : Err(e) => {
192 0 : debug!("could not connect to Postgres: {}, retrying", e);
193 :
194 : // Establish a new connection and try again.
195 0 : client = Client::connect(connstr, NoTls);
196 : }
197 : }
198 : }
199 : }
200 :
201 : // Hang on condition variable waiting until the compute status is `Running`.
202 572 : fn wait_for_postgres_start(compute: &ComputeNode) {
203 572 : let mut state = compute.state.lock().unwrap();
204 572 : while state.status != ComputeStatus::Running {
205 572 : info!("compute is not running, waiting before monitoring activity");
206 572 : state = compute.state_changed.wait(state).unwrap();
207 572 :
208 572 : if state.status == ComputeStatus::Running {
209 572 : break;
210 0 : }
211 : }
212 572 : }
213 :
214 : // Figure out the total active time and sessions across all non-system databases.
215 : // Returned tuple is `(active_time, sessions)`.
216 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
217 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
218 : // (or open any sessions) yet.
219 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
220 0 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
221 0 : // like `postgres_exporter` use it to query Postgres statistics.
222 0 : // Use explicit 8 bytes type casts to match Rust types.
223 0 : let stats = cli.query_one(
224 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
225 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
226 0 : FROM pg_stat_database
227 0 : WHERE datname NOT IN (
228 0 : 'postgres',
229 0 : 'template0',
230 0 : 'template1'
231 0 : );",
232 0 : &[],
233 0 : );
234 0 : let stats = match stats {
235 0 : Ok(stats) => stats,
236 0 : Err(e) => {
237 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
238 : }
239 : };
240 :
241 0 : let active_time: f64 = match stats.try_get("total_active_time") {
242 0 : Ok(active_time) => active_time,
243 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
244 : };
245 :
246 0 : let sessions: i64 = match stats.try_get("total_sessions") {
247 0 : Ok(sessions) => sessions,
248 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
249 : };
250 :
251 0 : Ok((active_time, sessions))
252 0 : }
253 :
254 : // Figure out the most recent state change time across all client backends.
255 : // If there is currently active backend, timestamp will be `Utc::now()`.
256 : // It can return `None`, which means no client backends exist or we were
257 : // unable to parse the timestamp.
258 11273 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
259 11273 : let mut last_active: Option<DateTime<Utc>> = None;
260 11273 : // Get all running client backends except ourself, use RFC3339 DateTime format.
261 11273 : let backends = cli.query(
262 11273 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
263 11273 : FROM pg_stat_activity
264 11273 : WHERE backend_type = 'client backend'
265 11273 : AND pid != pg_backend_pid()
266 11273 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
267 11273 : &[],
268 11273 : );
269 11273 :
270 11273 : match backends {
271 11227 : Ok(backs) => {
272 11227 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
273 :
274 11227 : for b in backs.into_iter() {
275 2 : let state: String = match b.try_get("state") {
276 2 : Ok(state) => state,
277 0 : Err(_) => continue,
278 : };
279 :
280 2 : if state == "idle" {
281 1 : let change: String = match b.try_get("state_change") {
282 1 : Ok(state_change) => state_change,
283 0 : Err(_) => continue,
284 : };
285 1 : let change = DateTime::parse_from_rfc3339(&change);
286 1 : match change {
287 1 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
288 0 : Err(e) => {
289 0 : info!("cannot parse backend state_change DateTime: {}", e);
290 0 : continue;
291 : }
292 : }
293 : } else {
294 : // Found non-idle backend, so the last activity is NOW.
295 : // Return immediately, no need to check other backends.
296 1 : return Ok(Some(Utc::now()));
297 : }
298 : }
299 :
300 : // Get idle backend `state_change` with the max timestamp.
301 11225 : if let Some(last) = idle_backs.iter().max() {
302 1 : last_active = Some(*last);
303 11224 : }
304 : }
305 46 : Err(e) => {
306 46 : return Err(anyhow::anyhow!("could not query backends: {}", e));
307 : }
308 : }
309 :
310 11225 : Ok(last_active)
311 11272 : }
312 :
313 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
314 572 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
315 572 : let compute = Arc::clone(compute);
316 572 :
317 572 : thread::Builder::new()
318 572 : .name("compute-monitor".into())
319 572 : .spawn(move || watch_compute_activity(&compute))
320 572 : .expect("cannot launch compute monitor thread")
321 572 : }
|