Line data Source code
1 : use std::sync::Arc;
2 : use std::thread;
3 : use std::time::Duration;
4 :
5 : use chrono::{DateTime, Utc};
6 : use compute_api::responses::ComputeStatus;
7 : use compute_api::spec::ComputeFeature;
8 : use postgres::{Client, NoTls};
9 : use tracing::{debug, error, info, warn};
10 :
11 : use crate::compute::ComputeNode;
12 :
13 : const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
14 :
15 : // Spin in a loop and figure out the last activity time in the Postgres.
16 : // Then update it in the shared state. This function never errors out.
17 : // NB: the only expected panic is at `Mutex` unwrap(), all other errors
18 : // should be handled gracefully.
19 0 : fn watch_compute_activity(compute: &ComputeNode) {
20 0 : // Suppose that `connstr` doesn't change
21 0 : let connstr = compute.params.connstr.clone();
22 0 : let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
23 0 :
24 0 : // During startup and configuration we connect to every Postgres database,
25 0 : // but we don't want to count this as some user activity. So wait until
26 0 : // the compute fully started before monitoring activity.
27 0 : wait_for_postgres_start(compute);
28 0 :
29 0 : // Define `client` outside of the loop to reuse existing connection if it's active.
30 0 : let mut client = conf.connect(NoTls);
31 0 :
32 0 : let mut sleep = false;
33 0 : let mut prev_active_time: Option<f64> = None;
34 0 : let mut prev_sessions: Option<i64> = None;
35 0 :
36 0 : if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
37 0 : info!("starting experimental activity monitor for {}", connstr);
38 : } else {
39 0 : info!("starting activity monitor for {}", connstr);
40 : }
41 :
42 : loop {
43 : // We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
44 : // But skip the first sleep, so we can connect to Postgres immediately.
45 0 : if sleep {
46 0 : // Should be outside of the mutex lock to allow others to read while we sleep.
47 0 : thread::sleep(MONITOR_CHECK_INTERVAL);
48 0 : } else {
49 0 : sleep = true;
50 0 : }
51 :
52 0 : match &mut client {
53 0 : Ok(cli) => {
54 0 : if cli.is_closed() {
55 0 : info!("connection to Postgres is closed, trying to reconnect");
56 :
57 : // Connection is closed, reconnect and try again.
58 0 : client = conf.connect(NoTls);
59 0 : continue;
60 0 : }
61 0 :
62 0 : // This is a new logic, only enable if the feature flag is set.
63 0 : // TODO: remove this once we are sure that it works OR drop it altogether.
64 0 : if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
65 : // First, check if the total active time or sessions across all databases has changed.
66 : // If it did, it means that user executed some queries. In theory, it can even go down if
67 : // some databases were dropped, but it's still a user activity.
68 0 : match get_database_stats(cli) {
69 0 : Ok((active_time, sessions)) => {
70 0 : let mut detected_activity = false;
71 0 :
72 0 : prev_active_time = match prev_active_time {
73 0 : Some(prev_active_time) => {
74 0 : if active_time != prev_active_time {
75 0 : detected_activity = true;
76 0 : }
77 0 : Some(active_time)
78 : }
79 0 : None => Some(active_time),
80 : };
81 0 : prev_sessions = match prev_sessions {
82 0 : Some(prev_sessions) => {
83 0 : if sessions != prev_sessions {
84 0 : detected_activity = true;
85 0 : }
86 0 : Some(sessions)
87 : }
88 0 : None => Some(sessions),
89 : };
90 :
91 0 : if detected_activity {
92 : // Update the last active time and continue, we don't need to
93 : // check backends state change.
94 0 : compute.update_last_active(Some(Utc::now()));
95 0 : continue;
96 0 : }
97 : }
98 0 : Err(e) => {
99 0 : error!("could not get database statistics: {}", e);
100 0 : continue;
101 : }
102 : }
103 0 : }
104 :
105 : // Second, if database statistics is the same, check all backends state change,
106 : // maybe there is some with more recent activity. `get_backends_state_change()`
107 : // can return None or stale timestamp, so it's `compute.update_last_active()`
108 : // responsibility to check if the new timestamp is more recent than the current one.
109 : // This helps us to discover new sessions, that did nothing yet.
110 0 : match get_backends_state_change(cli) {
111 0 : Ok(last_active) => {
112 0 : compute.update_last_active(last_active);
113 0 : }
114 0 : Err(e) => {
115 0 : error!("could not get backends state change: {}", e);
116 : }
117 : }
118 :
119 : // Finally, if there are existing (logical) walsenders, do not suspend.
120 : //
121 : // walproposer doesn't currently show up in pg_stat_replication,
122 : // but protect if it will be
123 0 : let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
124 0 : match cli.query_one(ws_count_query, &[]) {
125 0 : Ok(r) => match r.try_get::<&str, i64>("count") {
126 0 : Ok(num_ws) => {
127 0 : if num_ws > 0 {
128 0 : compute.update_last_active(Some(Utc::now()));
129 0 : continue;
130 0 : }
131 : }
132 0 : Err(e) => {
133 0 : warn!("failed to parse walsenders count: {:?}", e);
134 0 : continue;
135 : }
136 : },
137 0 : Err(e) => {
138 0 : warn!("failed to get list of walsenders: {:?}", e);
139 0 : continue;
140 : }
141 : }
142 : //
143 : // Don't suspend compute if there is an active logical replication subscription
144 : //
145 : // `where pid is not null` – to filter out read only computes and subscription on branches
146 : //
147 0 : let logical_subscriptions_query =
148 0 : "select count(*) from pg_stat_subscription where pid is not null;";
149 0 : match cli.query_one(logical_subscriptions_query, &[]) {
150 0 : Ok(row) => match row.try_get::<&str, i64>("count") {
151 0 : Ok(num_subscribers) => {
152 0 : if num_subscribers > 0 {
153 0 : compute.update_last_active(Some(Utc::now()));
154 0 : continue;
155 0 : }
156 : }
157 0 : Err(e) => {
158 0 : warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
159 0 : continue;
160 : }
161 : },
162 0 : Err(e) => {
163 0 : warn!(
164 0 : "failed to get list of active logical replication subscriptions: {:?}",
165 : e
166 : );
167 0 : continue;
168 : }
169 : }
170 : //
171 : // Do not suspend compute if autovacuum is running
172 : //
173 0 : let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
174 0 : match cli.query_one(autovacuum_count_query, &[]) {
175 0 : Ok(r) => match r.try_get::<&str, i64>("count") {
176 0 : Ok(num_workers) => {
177 0 : if num_workers > 0 {
178 0 : compute.update_last_active(Some(Utc::now()));
179 0 : continue;
180 0 : }
181 : }
182 0 : Err(e) => {
183 0 : warn!("failed to parse autovacuum workers count: {:?}", e);
184 0 : continue;
185 : }
186 : },
187 0 : Err(e) => {
188 0 : warn!("failed to get list of autovacuum workers: {:?}", e);
189 0 : continue;
190 : }
191 : }
192 : }
193 0 : Err(e) => {
194 0 : debug!("could not connect to Postgres: {}, retrying", e);
195 :
196 : // Establish a new connection and try again.
197 0 : client = conf.connect(NoTls);
198 : }
199 : }
200 : }
201 : }
202 :
203 : // Hang on condition variable waiting until the compute status is `Running`.
204 0 : fn wait_for_postgres_start(compute: &ComputeNode) {
205 0 : let mut state = compute.state.lock().unwrap();
206 0 : while state.status != ComputeStatus::Running {
207 0 : info!("compute is not running, waiting before monitoring activity");
208 0 : state = compute.state_changed.wait(state).unwrap();
209 0 :
210 0 : if state.status == ComputeStatus::Running {
211 0 : break;
212 0 : }
213 : }
214 0 : }
215 :
216 : // Figure out the total active time and sessions across all non-system databases.
217 : // Returned tuple is `(active_time, sessions)`.
218 : // It can return `0.0` active time or `0` sessions, which means no user databases exist OR
219 : // it was a start with skipped `pg_catalog` updates and user didn't do any queries
220 : // (or open any sessions) yet.
221 0 : fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
222 0 : // Filter out `postgres` database as `compute_ctl` and other monitoring tools
223 0 : // like `postgres_exporter` use it to query Postgres statistics.
224 0 : // Use explicit 8 bytes type casts to match Rust types.
225 0 : let stats = cli.query_one(
226 0 : "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
227 0 : coalesce(sum(sessions), 0)::bigint AS total_sessions
228 0 : FROM pg_stat_database
229 0 : WHERE datname NOT IN (
230 0 : 'postgres',
231 0 : 'template0',
232 0 : 'template1'
233 0 : );",
234 0 : &[],
235 0 : );
236 0 : let stats = match stats {
237 0 : Ok(stats) => stats,
238 0 : Err(e) => {
239 0 : return Err(anyhow::anyhow!("could not query active_time: {}", e));
240 : }
241 : };
242 :
243 0 : let active_time: f64 = match stats.try_get("total_active_time") {
244 0 : Ok(active_time) => active_time,
245 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)),
246 : };
247 :
248 0 : let sessions: i64 = match stats.try_get("total_sessions") {
249 0 : Ok(sessions) => sessions,
250 0 : Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)),
251 : };
252 :
253 0 : Ok((active_time, sessions))
254 0 : }
255 :
256 : // Figure out the most recent state change time across all client backends.
257 : // If there is currently active backend, timestamp will be `Utc::now()`.
258 : // It can return `None`, which means no client backends exist or we were
259 : // unable to parse the timestamp.
260 0 : fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime<Utc>>> {
261 0 : let mut last_active: Option<DateTime<Utc>> = None;
262 0 : // Get all running client backends except ourself, use RFC3339 DateTime format.
263 0 : let backends = cli.query(
264 0 : "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
265 0 : FROM pg_stat_activity
266 0 : WHERE backend_type = 'client backend'
267 0 : AND pid != pg_backend_pid()
268 0 : AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
269 0 : &[],
270 0 : );
271 0 :
272 0 : match backends {
273 0 : Ok(backs) => {
274 0 : let mut idle_backs: Vec<DateTime<Utc>> = vec![];
275 :
276 0 : for b in backs.into_iter() {
277 0 : let state: String = match b.try_get("state") {
278 0 : Ok(state) => state,
279 0 : Err(_) => continue,
280 : };
281 :
282 0 : if state == "idle" {
283 0 : let change: String = match b.try_get("state_change") {
284 0 : Ok(state_change) => state_change,
285 0 : Err(_) => continue,
286 : };
287 0 : let change = DateTime::parse_from_rfc3339(&change);
288 0 : match change {
289 0 : Ok(t) => idle_backs.push(t.with_timezone(&Utc)),
290 0 : Err(e) => {
291 0 : info!("cannot parse backend state_change DateTime: {}", e);
292 0 : continue;
293 : }
294 : }
295 : } else {
296 : // Found non-idle backend, so the last activity is NOW.
297 : // Return immediately, no need to check other backends.
298 0 : return Ok(Some(Utc::now()));
299 : }
300 : }
301 :
302 : // Get idle backend `state_change` with the max timestamp.
303 0 : if let Some(last) = idle_backs.iter().max() {
304 0 : last_active = Some(*last);
305 0 : }
306 : }
307 0 : Err(e) => {
308 0 : return Err(anyhow::anyhow!("could not query backends: {}", e));
309 : }
310 : }
311 :
312 0 : Ok(last_active)
313 0 : }
314 :
315 : /// Launch a separate compute monitor thread and return its `JoinHandle`.
316 0 : pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
317 0 : let compute = Arc::clone(compute);
318 0 :
319 0 : thread::Builder::new()
320 0 : .name("compute-monitor".into())
321 0 : .spawn(move || watch_compute_activity(&compute))
322 0 : .expect("cannot launch compute monitor thread")
323 0 : }
|