Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{Level, error, info, instrument, span};
10 :
11 : use crate::compute::ComputeNode;
12 : use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
13 :
14 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
15 :
16 : /// Struct to store runtime state of the compute monitor thread.
17 : /// In theory, this could be a part of `Compute`, but i)
18 : /// this state is expected to be accessed only by single thread,
19 : /// so we don't need to care about locking; ii) `Compute` is
20 : /// already quite big. Thus, it seems to be a good idea to keep
21 : /// all the activity/health monitoring parts here.
22 : struct ComputeMonitor {
23 : compute: Arc<ComputeNode>,
24 :
25 : /// The moment when Postgres had some activity,
26 : /// that should prevent compute from being suspended.
27 : last_active: Option<DateTime<Utc>>,
28 :
29 : /// The moment when we last tried to check Postgres.
30 : last_checked: DateTime<Utc>,
31 : /// The last moment we did a successful Postgres check.
32 : last_up: DateTime<Utc>,
33 :
34 : /// Only used for internal statistics change tracking
35 : /// between monitor runs and can be outdated.
36 : active_time: Option<f64>,
37 : /// Only used for internal statistics change tracking
38 : /// between monitor runs and can be outdated.
39 : sessions: Option<i64>,
40 :
41 : /// Use experimental statistics-based activity monitor. It's no longer
42 : /// 'experimental' per se, as it's enabled for everyone, but we still
43 : /// keep the flag as an option to turn it off in some cases if it will
44 : /// misbehave.
45 : experimental: bool,
46 : }
47 :
48 : impl ComputeMonitor {
49 0 : fn report_down(&self) {
50 0 : let now = Utc::now();
51 :
52 : // Calculate and report current downtime
53 : // (since the last time Postgres was up)
54 0 : let downtime = now.signed_duration_since(self.last_up);
55 0 : PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
56 :
57 : // Calculate and update total downtime
58 : // (cumulative duration of Postgres downtime in ms)
59 0 : let inc = now
60 0 : .signed_duration_since(self.last_checked)
61 0 : .num_milliseconds();
62 0 : PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
63 0 : }
64 :
65 0 : fn report_up(&mut self) {
66 0 : self.last_up = Utc::now();
67 0 : PG_CURR_DOWNTIME_MS.set(0.0);
68 0 : }
69 :
70 0 : fn downtime_info(&self) -> String {
71 0 : format!(
72 0 : "total_ms: {}, current_ms: {}, last_up: {}",
73 0 : PG_TOTAL_DOWNTIME_MS.get(),
74 0 : PG_CURR_DOWNTIME_MS.get(),
75 : self.last_up
76 : )
77 0 : }
78 :
79 : /// Check if compute is in some terminal or soon-to-be-terminal
80 : /// state, then return `true`, signalling the caller that it
81 : /// should exit gracefully. Otherwise, return `false`.
82 0 : fn check_interrupts(&mut self) -> bool {
83 0 : let compute_status = self.compute.get_status();
84 0 : if matches!(
85 0 : compute_status,
86 : ComputeStatus::Terminated
87 : | ComputeStatus::TerminationPending { .. }
88 : | ComputeStatus::Failed
89 : ) {
90 0 : info!(
91 0 : "compute is in {} status, stopping compute monitor",
92 : compute_status
93 : );
94 0 : return true;
95 0 : }
96 :
97 0 : false
98 0 : }
99 :
100 : /// Spin in a loop and figure out the last activity time in the Postgres.
101 : /// Then update it in the shared state. This function currently never
102 : /// errors out explicitly, but there is a graceful termination path.
103 : /// Every time we receive an error trying to check Postgres, we use
104 : /// [`ComputeMonitor::check_interrupts()`] because it could be that
105 : /// compute is being terminated already, then we can exit gracefully
106 : /// to not produce errors' noise in the log.
107 : /// NB: the only expected panic is at `Mutex` unwrap(), all other errors
108 : /// should be handled gracefully.
109 : #[instrument(skip_all)]
110 : pub fn run(&mut self) -> anyhow::Result<()> {
111 : // Suppose that `connstr` doesn't change
112 : let connstr = self.compute.params.connstr.clone();
113 : let conf = self
114 : .compute
115 : .get_conn_conf(Some("compute_ctl:compute_monitor"));
116 :
117 : // During startup and configuration we connect to every Postgres database,
118 : // but we don't want to count this as some user activity. So wait until
119 : // the compute fully started before monitoring activity.
120 : wait_for_postgres_start(&self.compute);
121 :
122 : // Define `client` outside of the loop to reuse existing connection if it's active.
123 : let mut client = conf.connect(NoTls);
124 :
125 : info!("starting compute monitor for {}", connstr);
126 :
127 : loop {
128 : if self.check_interrupts() {
129 : break;
130 : }
131 :
132 : match &mut client {
133 : Ok(cli) => {
134 : if cli.is_closed() {
135 : info!(
136 : downtime_info = self.downtime_info(),
137 : "connection to Postgres is closed, trying to reconnect"
138 : );
139 : if self.check_interrupts() {
140 : break;
141 : }
142 :
143 : self.report_down();
144 :
145 : // Connection is closed, reconnect and try again.
146 : client = conf.connect(NoTls);
147 : } else {
148 : match self.check(cli) {
149 : Ok(_) => {
150 : self.report_up();
151 : self.compute.update_last_active(self.last_active);
152 : }
153 : Err(e) => {
154 : error!(
155 : downtime_info = self.downtime_info(),
156 : "could not check Postgres: {}", e
157 : );
158 : if self.check_interrupts() {
159 : break;
160 : }
161 :
162 : // Although we have many places where we can return errors in `check()`,
163 : // normally it shouldn't happen. I.e., we will likely return error if
164 : // connection got broken, query timed out, Postgres returned invalid data, etc.
165 : // In all such cases it's suspicious, so let's report this as downtime.
166 : self.report_down();
167 :
168 : // Reconnect to Postgres just in case. During tests, I noticed
169 : // that queries in `check()` can fail with `connection closed`,
170 : // but `cli.is_closed()` above doesn't detect it. Even if old
171 : // connection is still alive, it will be dropped when we reassign
172 : // `client` to a new connection.
173 : client = conf.connect(NoTls);
174 : }
175 : }
176 : }
177 : }
178 : Err(e) => {
179 : info!(
180 : downtime_info = self.downtime_info(),
181 : "could not connect to Postgres: {}, retrying", e
182 : );
183 : if self.check_interrupts() {
184 : break;
185 : }
186 :
187 : self.report_down();
188 :
189 : // Establish a new connection and try again.
190 : client = conf.connect(NoTls);
191 : }
192 : }
193 :
194 : // Reset the `last_checked` timestamp and sleep before the next iteration.
195 : self.last_checked = Utc::now();
196 : thread::sleep(MONITOR_CHECK_INTERVAL);
197 : }
198 :
199 : // Graceful termination path
200 : Ok(())
201 : }
202 :
203 : #[instrument(skip_all)]
204 : fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
205 : // This is new logic, only enable if the feature flag is set.
206 : // TODO: remove this once we are sure that it works OR drop it altogether.
207 : if self.experimental {
208 : // Check if the total active time or sessions across all databases has changed.
209 : // If it did, it means that user executed some queries. In theory, it can even go down if
210 : // some databases were dropped, but it's still user activity.
211 : match get_database_stats(cli) {
212 : Ok((active_time, sessions)) => {
213 : let mut detected_activity = false;
214 :
215 : if let Some(prev_active_time) = self.active_time {
216 : if active_time != prev_active_time {
217 : detected_activity = true;
218 : }
219 : }
220 : self.active_time = Some(active_time);
221 :
222 : if let Some(prev_sessions) = self.sessions {
223 : if sessions != prev_sessions {
224 : detected_activity = true;
225 : }
226 : }
227 : self.sessions = Some(sessions);
228 :
229 : if detected_activity {
230 : // Update the last active time and continue, we don't need to
231 : // check backends state change.
232 : self.last_active = Some(Utc::now());
233 : return Ok(());
234 : }
235 : }
236 : Err(e) => {
237 : return Err(anyhow::anyhow!("could not get database statistics: {}", e));
238 : }
239 : }
240 : }
241 :
242 : // If database statistics are the same, check all backends for state changes.
243 : // Maybe there are some with more recent activity. `get_backends_state_change()`
244 : // can return None or stale timestamp, so it's `compute.update_last_active()`
245 : // responsibility to check if the new timestamp is more recent than the current one.
246 : // This helps us to discover new sessions that have not done anything yet.
247 : match get_backends_state_change(cli) {
248 : Ok(last_active) => match (last_active, self.last_active) {
249 : (Some(last_active), Some(prev_last_active)) => {
250 : if last_active > prev_last_active {
251 : self.last_active = Some(last_active);
252 : return Ok(());
253 : }
254 : }
255 : (Some(last_active), None) => {
256 : self.last_active = Some(last_active);
257 : return Ok(());
258 : }
259 : _ => {}
260 : },
261 : Err(e) => {
262 : return Err(anyhow::anyhow!(
263 : "could not get backends state change: {}",
264 : e
265 : ));
266 : }
267 : }
268 :
269 : // If there are existing (logical) walsenders, do not suspend.
270 : //
271 : // N.B. walproposer doesn't currently show up in pg_stat_replication,
272 : // but protect if it will.
273 : const WS_COUNT_QUERY: &str =
274 : "select count(*) from pg_stat_replication where application_name != 'walproposer';";
275 : match cli.query_one(WS_COUNT_QUERY, &[]) {
276 : Ok(r) => match r.try_get::<&str, i64>("count") {
277 : Ok(num_ws) => {
278 : if num_ws > 0 {
279 : self.last_active = Some(Utc::now());
280 : return Ok(());
281 : }
282 : }
283 : Err(e) => {
284 : let err: anyhow::Error = e.into();
285 : return Err(err.context("failed to parse walsenders count"));
286 : }
287 : },
288 : Err(e) => {
289 : return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
290 : }
291 : }
292 :
293 : // Don't suspend compute if there is an active logical replication subscription
294 : //
295 : // `where pid is not null` – to filter out read only computes and subscription on branches
296 : const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
297 : "select count(*) from pg_stat_subscription where pid is not null;";
298 : match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
299 : Ok(row) => match row.try_get::<&str, i64>("count") {
300 : Ok(num_subscribers) => {
301 : if num_subscribers > 0 {
302 : self.last_active = Some(Utc::now());
303 : return Ok(());
304 : }
305 : }
306 : Err(e) => {
307 : return Err(anyhow::anyhow!(
308 : "failed to parse 'pg_stat_subscription' count: {}",
309 : e
310 : ));
311 : }
312 : },
313 : Err(e) => {
314 : return Err(anyhow::anyhow!(
315 : "failed to get list of active logical replication subscriptions: {}",
316 : e
317 : ));
318 : }
319 : }
320 :
321 : // Do not suspend compute if autovacuum is running
322 : const AUTOVACUUM_COUNT_QUERY: &str =
323 : "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
324 : match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
325 : Ok(r) => match r.try_get::<&str, i64>("count") {
326 : Ok(num_workers) => {
327 : if num_workers > 0 {
328 : self.last_active = Some(Utc::now());
329 : return Ok(());
330 : };
331 : }
332 : Err(e) => {
333 : return Err(anyhow::anyhow!(
334 : "failed to parse autovacuum workers count: {}",
335 : e
336 : ));
337 : }
338 : },
339 : Err(e) => {
340 : return Err(anyhow::anyhow!(
341 : "failed to get list of autovacuum workers: {}",
342 : e
343 : ));
344 : }
345 : }
346 :
347 : Ok(())
348 : }
349 : }
350 :
351 : // Hang on condition variable waiting until the compute status is `Running`.
352 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
353 0 : let mut state = compute.state.lock().unwrap();
354 0 : while state.status != ComputeStatus::Running {
355 0 : info!("compute is not running, waiting before monitoring activity");
356 0 : state = compute.state_changed.wait(state).unwrap();
357 :
358 0 : if state.status == ComputeStatus::Running {
359 0 : break;
360 0 : }
361 : }
362 0 : }
363 :
364 : // Figure out the total active time and sessions across all non-system databases.
365 : // Returned tuple is `(active_time, sessions)`.
366 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
367 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
368 : // (or open any sessions) yet.
369 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
370 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
371 : // like `postgres_exporter` use it to query Postgres statistics.
372 : // Use explicit 8 bytes type casts to match Rust types.
373 0 : let stats = cli.query_one(
374 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
375 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
376 0 : FROM pg_stat_database
377 0 : WHERE datname NOT IN (
378 0 : 'postgres',
379 0 : 'template0',
380 0 : 'template1'
381 0 : );",
382 0 : &[],
383 : );
384 0 : let stats = match stats {
385 0 : Ok(stats) => stats,
386 0 : Err(e) => {
387 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
388 : }
389 : };
390 :
391 0 : let active_time: f64 = match stats.try_get("total_active_time") {
392 0 : Ok(active_time) => active_time,
393 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
394 : };
395 :
396 0 : let sessions: i64 = match stats.try_get("total_sessions") {
397 0 : Ok(sessions) => sessions,
398 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
399 : };
400 :
401 0 : Ok((active_time, sessions))
402 0 : }
403 :
404 : // Figure out the most recent state change time across all client backends.
405 : // If there is currently active backend, timestamp will be `Utc::now()`.
406 : // It can return `None`, which means no client backends exist or we were
407 : // unable to parse the timestamp.
408 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
409 0 : let mut last_active: Option<DateTime<Utc>> = None;
410 : // Get all running client backends except ourself, use RFC3339 DateTime format.
411 0 : let backends = cli.query(
412 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
413 0 : FROM pg_stat_activity
414 0 : WHERE backend_type = 'client backend'
415 0 : AND pid != pg_backend_pid()
416 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
417 0 : &[],
418 : );
419 :
420 0 : match backends {
421 0 : Ok(backs) => {
422 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
423 :
424 0 : for b in backs.into_iter() {
425 0 : let state: String = match b.try_get("state") {
426 0 : Ok(state) => state,
427 0 : Err(_) => continue,
428 : };
429 :
430 0 : if state == "idle" {
431 0 : let change: String = match b.try_get("state_change") {
432 0 : Ok(state_change) => state_change,
433 0 : Err(_) => continue,
434 : };
435 0 : let change = DateTime::parse_from_rfc3339(&change);
436 0 : match change {
437 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
438 0 : Err(e) => {
439 0 : info!("cannot parse backend state_change DateTime: {}", e);
440 0 : continue;
441 : }
442 : }
443 : } else {
444 : // Found non-idle backend, so the last activity is NOW.
445 : // Return immediately, no need to check other backends.
446 0 : return Ok(Some(Utc::now()));
447 : }
448 : }
449 :
450 : // Get idle backend `state_change` with the max timestamp.
451 0 : if let Some(last) = idle_backs.iter().max() {
452 0 : last_active = Some(*last);
453 0 : }
454 : }
455 0 : Err(e) => {
456 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
457 : }
458 : }
459 :
460 0 : Ok(last_active)
461 0 : }
462 :
463 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
464 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
465 0 : let compute = Arc::clone(compute);
466 0 : let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
467 0 : let now = Utc::now();
468 0 : let mut monitor = ComputeMonitor {
469 0 : compute,
470 0 : last_active: None,
471 0 : last_checked: now,
472 0 : last_up: now,
473 0 : active_time: None,
474 0 : sessions: None,
475 0 : experimental,
476 0 : };
477 :
478 0 : thread::Builder::new()
479 0 : .name("compute-monitor".into())
480 0 : .spawn(move || {
481 0 : let span = span!(Level::INFO, "compute_monitor");
482 0 : let _enter = span.enter();
483 0 : match monitor.run() {
484 0 : Ok(_) => info!("compute monitor thread terminated gracefully"),
485 0 : Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
486 : }
487 0 : })
488 0 : .expect("cannot launch compute monitor thread")
489 0 : }
|