Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{Level, error, info, instrument, span};
10 :
11 : use crate::compute::ComputeNode;
12 : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
13 :
14 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
15 :
16 : /// Struct to store runtime state of the compute monitor thread.
17 : /// In theory, this could be a part of `Compute`, but i)
18 : /// this state is expected to be accessed only by single thread,
19 : /// so we don't need to care about locking; ii) `Compute` is
20 : /// already quite big. Thus, it seems to be a good idea to keep
21 : /// all the activity/health monitoring parts here.
22 : struct ComputeMonitor {
23 : compute: Arc<ComputeNode>,
24 :
25 : /// The moment when Postgres had some activity,
26 : /// that should prevent compute from being suspended.
27 : last_active: Option<DateTime<Utc>>,
28 :
29 : /// The moment when we last tried to check Postgres.
30 : last_checked: DateTime<Utc>,
31 : /// The last moment we did a successful Postgres check.
32 : last_up: DateTime<Utc>,
33 :
34 : /// Only used for internal statistics change tracking
35 : /// between monitor runs and can be outdated.
36 : active_time: Option<f64>,
37 : /// Only used for internal statistics change tracking
38 : /// between monitor runs and can be outdated.
39 : sessions: Option<i64>,
40 :
41 : /// Use experimental statistics-based activity monitor. It's no longer
42 : /// 'experimental' per se, as it's enabled for everyone, but we still
43 : /// keep the flag as an option to turn it off in some cases if it will
44 : /// misbehave.
45 : experimental: bool,
46 : }
47 :
48 : impl ComputeMonitor {
49 0 : fn report_down(&self) {
50 0 : let now = Utc::now();
51 :
52 : // Calculate and report current downtime
53 : // (since the last time Postgres was up)
54 0 : let downtime = now.signed_duration_since(self.last_up);
55 0 : PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
56 :
57 : // Calculate and update total downtime
58 : // (cumulative duration of Postgres downtime in ms)
59 0 : let inc = now
60 0 : .signed_duration_since(self.last_checked)
61 0 : .num_milliseconds();
62 0 : PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
63 0 : }
64 :
65 0 : fn report_up(&mut self) {
66 0 : self.last_up = Utc::now();
67 0 : PG_CURR_DOWNTIME_MS.set(0.0);
68 0 : }
69 :
70 0 : fn downtime_info(&self) -> String {
71 0 : format!(
72 0 : "total_ms: {}, current_ms: {}, last_up: {}",
73 0 : PG_TOTAL_DOWNTIME_MS.get(),
74 0 : PG_CURR_DOWNTIME_MS.get(),
75 : self.last_up
76 : )
77 0 : }
78 :
79 : /// Check if compute is in some terminal or soon-to-be-terminal
80 : /// state, then return `true`, signalling the caller that it
81 : /// should exit gracefully. Otherwise, return `false`.
82 0 : fn check_interrupts(&mut self) -> bool {
83 0 : let compute_status = self.compute.get_status();
84 0 : if matches!(
85 0 : compute_status,
86 : ComputeStatus::Terminated
87 : | ComputeStatus::TerminationPendingFast
88 : | ComputeStatus::TerminationPendingImmediate
89 : | ComputeStatus::Failed
90 : ) {
91 0 : info!(
92 0 : "compute is in {} status, stopping compute monitor",
93 : compute_status
94 : );
95 0 : return true;
96 0 : }
97 :
98 0 : false
99 0 : }
100 :
101 : /// Spin in a loop and figure out the last activity time in the Postgres.
102 : /// Then update it in the shared state. This function currently never
103 : /// errors out explicitly, but there is a graceful termination path.
104 : /// Every time we receive an error trying to check Postgres, we use
105 : /// [`ComputeMonitor::check_interrupts()`] because it could be that
106 : /// compute is being terminated already, then we can exit gracefully
107 : /// to not produce errors' noise in the log.
108 : /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
109 : /// should be handled gracefully.
110 : #[instrument(skip_all)]
111 : pub fn run(&mut self) -> anyhow::Result<()> {
112 : // Suppose that `connstr` doesn't change
113 : let connstr = self.compute.params.connstr.clone();
114 : let conf = self
115 : .compute
116 : .get_conn_conf(Some("compute_ctl:compute_monitor"));
117 :
118 : // During startup and configuration we connect to every Postgres database,
119 : // but we don't want to count this as some user activity. So wait until
120 : // the compute fully started before monitoring activity.
121 : wait_for_postgres_start(&self.compute);
122 :
123 : // Define `client` outside of the loop to reuse existing connection if it's active.
124 : let mut client = conf.connect(NoTls);
125 :
126 : info!("starting compute monitor for {}", connstr);
127 :
128 : loop {
129 : if self.check_interrupts() {
130 : break;
131 : }
132 :
133 : match &mut client {
134 : Ok(cli) => {
135 : if cli.is_closed() {
136 : info!(
137 : downtime_info = self.downtime_info(),
138 : "connection to Postgres is closed, trying to reconnect"
139 : );
140 : if self.check_interrupts() {
141 : break;
142 : }
143 :
144 : self.report_down();
145 :
146 : // Connection is closed, reconnect and try again.
147 : client = conf.connect(NoTls);
148 : } else {
149 : match self.check(cli) {
150 : Ok(_) => {
151 : self.report_up();
152 : self.compute.update_last_active(self.last_active);
153 : }
154 : Err(e) => {
155 : error!(
156 : downtime_info = self.downtime_info(),
157 : "could not check Postgres: {}", e
158 : );
159 : if self.check_interrupts() {
160 : break;
161 : }
162 :
163 : // Although we have many places where we can return errors in `check()`,
164 : // normally it shouldn't happen. I.e., we will likely return error if
165 : // connection got broken, query timed out, Postgres returned invalid data, etc.
166 : // In all such cases it's suspicious, so let's report this as downtime.
167 : self.report_down();
168 :
169 : // Reconnect to Postgres just in case. During tests, I noticed
170 : // that queries in `check()` can fail with `connection closed`,
171 : // but `cli.is_closed()` above doesn't detect it. Even if old
172 : // connection is still alive, it will be dropped when we reassign
173 : // `client` to a new connection.
174 : client = conf.connect(NoTls);
175 : }
176 : }
177 : }
178 : }
179 : Err(e) => {
180 : info!(
181 : downtime_info = self.downtime_info(),
182 : "could not connect to Postgres: {}, retrying", e
183 : );
184 : if self.check_interrupts() {
185 : break;
186 : }
187 :
188 : self.report_down();
189 :
190 : // Establish a new connection and try again.
191 : client = conf.connect(NoTls);
192 : }
193 : }
194 :
195 : // Reset the `last_checked` timestamp and sleep before the next iteration.
196 : self.last_checked = Utc::now();
197 : thread::sleep(MONITOR_CHECK_INTERVAL);
198 : }
199 :
200 : // Graceful termination path
201 : Ok(())
202 : }
203 :
204 : #[instrument(skip_all)]
205 : fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
206 : // This is new logic, only enable if the feature flag is set.
207 : // TODO: remove this once we are sure that it works OR drop it altogether.
208 : if self.experimental {
209 : // Check if the total active time or sessions across all databases has changed.
210 : // If it did, it means that user executed some queries. In theory, it can even go down if
211 : // some databases were dropped, but it's still user activity.
212 : match get_database_stats(cli) {
213 : Ok((active_time, sessions)) => {
214 : let mut detected_activity = false;
215 :
216 : if let Some(prev_active_time) = self.active_time {
217 : if active_time != prev_active_time {
218 : detected_activity = true;
219 : }
220 : }
221 : self.active_time = Some(active_time);
222 :
223 : if let Some(prev_sessions) = self.sessions {
224 : if sessions != prev_sessions {
225 : detected_activity = true;
226 : }
227 : }
228 : self.sessions = Some(sessions);
229 :
230 : if detected_activity {
231 : // Update the last active time and continue, we don't need to
232 : // check backends state change.
233 : self.last_active = Some(Utc::now());
234 : return Ok(());
235 : }
236 : }
237 : Err(e) => {
238 : return Err(anyhow::anyhow!("could not get database statistics: {}", e));
239 : }
240 : }
241 : }
242 :
243 : // If database statistics are the same, check all backends for state changes.
244 : // Maybe there are some with more recent activity. `get_backends_state_change()`
245 : // can return None or stale timestamp, so it's `compute.update_last_active()`
246 : // responsibility to check if the new timestamp is more recent than the current one.
247 : // This helps us to discover new sessions that have not done anything yet.
248 : match get_backends_state_change(cli) {
249 : Ok(last_active) => match (last_active, self.last_active) {
250 : (Some(last_active), Some(prev_last_active)) => {
251 : if last_active > prev_last_active {
252 : self.last_active = Some(last_active);
253 : return Ok(());
254 : }
255 : }
256 : (Some(last_active), None) => {
257 : self.last_active = Some(last_active);
258 : return Ok(());
259 : }
260 : _ => {}
261 : },
262 : Err(e) => {
263 : return Err(anyhow::anyhow!(
264 : "could not get backends state change: {}",
265 : e
266 : ));
267 : }
268 : }
269 :
270 : // If there are existing (logical) walsenders, do not suspend.
271 : //
272 : // N.B. walproposer doesn't currently show up in pg_stat_replication,
273 : // but protect if it will.
274 : const WS_COUNT_QUERY: &str =
275 : "select count(*) from pg_stat_replication where application_name != 'walproposer';";
276 : match cli.query_one(WS_COUNT_QUERY, &[]) {
277 : Ok(r) => match r.try_get::<&str, i64>("count") {
278 : Ok(num_ws) => {
279 : if num_ws > 0 {
280 : self.last_active = Some(Utc::now());
281 : return Ok(());
282 : }
283 : }
284 : Err(e) => {
285 : let err: anyhow::Error = e.into();
286 : return Err(err.context("failed to parse walsenders count"));
287 : }
288 : },
289 : Err(e) => {
290 : return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
291 : }
292 : }
293 :
294 : // Don't suspend compute if there is an active logical replication subscription
295 : //
296 : // `where pid is not null` – to filter out read only computes and subscription on branches
297 : const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
298 : "select count(*) from pg_stat_subscription where pid is not null;";
299 : match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
300 : Ok(row) => match row.try_get::<&str, i64>("count") {
301 : Ok(num_subscribers) => {
302 : if num_subscribers > 0 {
303 : self.last_active = Some(Utc::now());
304 : return Ok(());
305 : }
306 : }
307 : Err(e) => {
308 : return Err(anyhow::anyhow!(
309 : "failed to parse 'pg_stat_subscription' count: {}",
310 : e
311 : ));
312 : }
313 : },
314 : Err(e) => {
315 : return Err(anyhow::anyhow!(
316 : "failed to get list of active logical replication subscriptions: {}",
317 : e
318 : ));
319 : }
320 : }
321 :
322 : // Do not suspend compute if autovacuum is running
323 : const AUTOVACUUM_COUNT_QUERY: &str =
324 : "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
325 : match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
326 : Ok(r) => match r.try_get::<&str, i64>("count") {
327 : Ok(num_workers) => {
328 : if num_workers > 0 {
329 : self.last_active = Some(Utc::now());
330 : return Ok(());
331 : };
332 : }
333 : Err(e) => {
334 : return Err(anyhow::anyhow!(
335 : "failed to parse autovacuum workers count: {}",
336 : e
337 : ));
338 : }
339 : },
340 : Err(e) => {
341 : return Err(anyhow::anyhow!(
342 : "failed to get list of autovacuum workers: {}",
343 : e
344 : ));
345 : }
346 : }
347 :
348 : Ok(())
349 : }
350 : }
351 :
352 : // Hang on condition variable waiting until the compute status is `Running`.
353 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
354 0 : let mut state = compute.state.lock().unwrap();
355 0 : while state.status != ComputeStatus::Running {
356 0 : info!("compute is not running, waiting before monitoring activity");
357 0 : state = compute.state_changed.wait(state).unwrap();
358 :
359 0 : if state.status == ComputeStatus::Running {
360 0 : break;
361 0 : }
362 : }
363 0 : }
364 :
365 : // Figure out the total active time and sessions across all non-system databases.
366 : // Returned tuple is `(active_time, sessions)`.
367 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
368 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
369 : // (or open any sessions) yet.
370 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
371 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
372 : // like `postgres_exporter` use it to query Postgres statistics.
373 : // Use explicit 8 bytes type casts to match Rust types.
374 0 : let stats = cli.query_one(
375 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
376 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
377 0 : FROM pg_stat_database
378 0 : WHERE datname NOT IN (
379 0 : 'postgres',
380 0 : 'template0',
381 0 : 'template1'
382 0 : );",
383 0 : &[],
384 : );
385 0 : let stats = match stats {
386 0 : Ok(stats) => stats,
387 0 : Err(e) => {
388 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
389 : }
390 : };
391 :
392 0 : let active_time: f64 = match stats.try_get("total_active_time") {
393 0 : Ok(active_time) => active_time,
394 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
395 : };
396 :
397 0 : let sessions: i64 = match stats.try_get("total_sessions") {
398 0 : Ok(sessions) => sessions,
399 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
400 : };
401 :
402 0 : Ok((active_time, sessions))
403 0 : }
404 :
405 : // Figure out the most recent state change time across all client backends.
406 : // If there is currently active backend, timestamp will be `Utc::now()`.
407 : // It can return `None`, which means no client backends exist or we were
408 : // unable to parse the timestamp.
409 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
410 0 : let mut last_active: Option<DateTime<Utc>> = None;
411 : // Get all running client backends except ourself, use RFC3339 DateTime format.
412 0 : let backends = cli.query(
413 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
414 0 : FROM pg_stat_activity
415 0 : WHERE backend_type = 'client backend'
416 0 : AND pid != pg_backend_pid()
417 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
418 0 : &[],
419 : );
420 :
421 0 : match backends {
422 0 : Ok(backs) => {
423 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
424 :
425 0 : for b in backs.into_iter() {
426 0 : let state: String = match b.try_get("state") {
427 0 : Ok(state) => state,
428 0 : Err(_) => continue,
429 : };
430 :
431 0 : if state == "idle" {
432 0 : let change: String = match b.try_get("state_change") {
433 0 : Ok(state_change) => state_change,
434 0 : Err(_) => continue,
435 : };
436 0 : let change = DateTime::parse_from_rfc3339(&change);
437 0 : match change {
438 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
439 0 : Err(e) => {
440 0 : info!("cannot parse backend state_change DateTime: {}", e);
441 0 : continue;
442 : }
443 : }
444 : } else {
445 : // Found non-idle backend, so the last activity is NOW.
446 : // Return immediately, no need to check other backends.
447 0 : return Ok(Some(Utc::now()));
448 : }
449 : }
450 :
451 : // Get idle backend `state_change` with the max timestamp.
452 0 : if let Some(last) = idle_backs.iter().max() {
453 0 : last_active = Some(*last);
454 0 : }
455 : }
456 0 : Err(e) => {
457 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
458 : }
459 : }
460 :
461 0 : Ok(last_active)
462 0 : }
463 :
464 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
465 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
466 0 : let compute = Arc::clone(compute);
467 0 : let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
468 0 : let now = Utc::now();
469 0 : let mut monitor = ComputeMonitor {
470 0 : compute,
471 0 : last_active: None,
472 0 : last_checked: now,
473 0 : last_up: now,
474 0 : active_time: None,
475 0 : sessions: None,
476 0 : experimental,
477 0 : };
478 :
479 0 : thread::Builder::new()
480 0 : .name("compute-monitor".into())
481 0 : .spawn(move || {
482 0 : let span = span!(Level::INFO, "compute_monitor");
483 0 : let _enter = span.enter();
484 0 : match monitor.run() {
485 0 : Ok(_) => info!("compute monitor thread terminated gracefully"),
486 0 : Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
487 : }
488 0 : })
489 0 : .expect("cannot launch compute monitor thread")
490 0 : }
|