Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
15 : IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
16 : };
17 : use once_cell::sync::Lazy;
18 :
19 : use postgres_ffi::XLogSegNo;
20 : use utils::pageserver_feedback::PageserverFeedback;
21 : use utils::{id::TenantTimelineId, lsn::Lsn};
22 :
23 : use crate::{
24 : state::{TimelineMemState, TimelinePersistentState},
25 : GlobalTimelines,
26 : };
27 :
28 : // Global metrics across all timelines.
29 433 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
30 433 : register_histogram!(
31 433 : "safekeeper_write_wal_bytes",
32 433 : "Bytes written to WAL in a single request",
33 433 : vec![
34 433 : 1.0,
35 433 : 10.0,
36 433 : 100.0,
37 433 : 1024.0,
38 433 : 8192.0,
39 433 : 128.0 * 1024.0,
40 433 : 1024.0 * 1024.0,
41 433 : 10.0 * 1024.0 * 1024.0
42 433 : ]
43 433 : )
44 433 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
45 433 : });
46 433 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
47 433 : register_histogram!(
48 433 : "safekeeper_write_wal_seconds",
49 433 : "Seconds spent writing and syncing WAL to a disk in a single request",
50 433 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
51 433 : )
52 433 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
53 433 : });
54 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
55 0 : register_histogram!(
56 0 : "safekeeper_flush_wal_seconds",
57 0 : "Seconds spent syncing WAL to a disk",
58 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
59 0 : )
60 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
61 0 : });
62 441 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
63 441 : register_histogram!(
64 441 : "safekeeper_persist_control_file_seconds",
65 441 : "Seconds to persist and sync control file",
66 441 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
67 441 : )
68 441 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
69 441 : });
70 442 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
71 442 : register_int_counter_vec!(
72 442 : "safekeeper_pg_io_bytes_total",
73 442 : "Bytes read from or written to any PostgreSQL connection",
74 442 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
75 442 : )
76 442 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
77 442 : });
78 429 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
79 429 : register_int_counter!(
80 429 : "safekeeper_broker_pushed_updates_total",
81 429 : "Number of timeline updates pushed to the broker"
82 429 : )
83 429 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
84 429 : });
85 510 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
86 510 : register_int_counter_vec!(
87 510 : "safekeeper_broker_pulled_updates_total",
88 510 : "Number of timeline updates pulled and processed from the broker",
89 510 : &["result"]
90 510 : )
91 510 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
92 510 : });
93 442 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
94 884 : register_int_counter_pair_vec!(
95 884 : "safekeeper_pg_queries_received_total",
96 884 : "Number of queries received through pg protocol",
97 884 : "safekeeper_pg_queries_finished_total",
98 884 : "Number of queries finished through pg protocol",
99 884 : &["query"]
100 884 : )
101 442 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
102 442 : });
103 7 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
104 7 : register_int_counter!(
105 7 : "safekeeper_removed_wal_segments_total",
106 7 : "Number of WAL segments removed from the disk"
107 7 : )
108 7 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
109 7 : });
110 6 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
111 6 : register_int_counter!(
112 6 : "safekeeper_backed_up_segments_total",
113 6 : "Number of WAL segments backed up to the S3"
114 6 : )
115 6 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
116 6 : });
117 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
118 0 : register_int_counter!(
119 0 : "safekeeper_backup_errors_total",
120 0 : "Number of errors during backup"
121 0 : )
122 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
123 0 : });
124 510 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
125 510 : register_histogram!(
126 510 : "safekeeper_broker_push_update_seconds",
127 510 : "Seconds to push all timeline updates to the broker",
128 510 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
129 510 : )
130 510 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
131 510 : });
132 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
133 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
134 : ];
135 510 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
136 510 : register_histogram!(
137 510 : "safekeeper_broker_iteration_timelines",
138 510 : "Count of timelines pushed to the broker in a single iteration",
139 510 : TIMELINES_COUNT_BUCKETS.to_vec()
140 510 : )
141 510 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
142 510 : });
143 :
144 : pub const LABEL_UNKNOWN: &str = "unknown";
145 :
146 : /// Labels for traffic metrics.
147 0 : #[derive(Clone)]
148 : struct ConnectionLabels {
149 : /// Availability zone of the connection origin.
150 : client_az: String,
151 : /// Availability zone of the current safekeeper.
152 : sk_az: String,
153 : /// Client application name.
154 : app_name: String,
155 : }
156 :
157 : impl ConnectionLabels {
158 3292 : fn new() -> Self {
159 3292 : Self {
160 3292 : client_az: LABEL_UNKNOWN.to_string(),
161 3292 : sk_az: LABEL_UNKNOWN.to_string(),
162 3292 : app_name: LABEL_UNKNOWN.to_string(),
163 3292 : }
164 3292 : }
165 :
166 7338 : fn build_metrics(
167 7338 : &self,
168 7338 : ) -> (
169 7338 : GenericCounter<metrics::core::AtomicU64>,
170 7338 : GenericCounter<metrics::core::AtomicU64>,
171 7338 : ) {
172 7338 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
173 7338 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
174 4 : (client_az, sk_az) => {
175 4 : if client_az == sk_az {
176 0 : "true"
177 : } else {
178 4 : "false"
179 : }
180 : }
181 : };
182 :
183 7338 : let read = PG_IO_BYTES.with_label_values(&[
184 7338 : &self.client_az,
185 7338 : &self.sk_az,
186 7338 : &self.app_name,
187 7338 : "read",
188 7338 : same_az,
189 7338 : ]);
190 7338 : let write = PG_IO_BYTES.with_label_values(&[
191 7338 : &self.client_az,
192 7338 : &self.sk_az,
193 7338 : &self.app_name,
194 7338 : "write",
195 7338 : same_az,
196 7338 : ]);
197 7338 : (read, write)
198 7338 : }
199 : }
200 :
201 : struct TrafficMetricsState {
202 : /// Labels for traffic metrics.
203 : labels: ConnectionLabels,
204 : /// Total bytes read from this connection.
205 : read: GenericCounter<metrics::core::AtomicU64>,
206 : /// Total bytes written to this connection.
207 : write: GenericCounter<metrics::core::AtomicU64>,
208 : }
209 :
210 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
211 3292 : #[derive(Clone)]
212 : pub struct TrafficMetrics {
213 : state: Arc<RwLock<TrafficMetricsState>>,
214 : }
215 :
216 : impl Default for TrafficMetrics {
217 0 : fn default() -> Self {
218 0 : Self::new()
219 0 : }
220 : }
221 :
222 : impl TrafficMetrics {
223 3292 : pub fn new() -> Self {
224 3292 : let labels = ConnectionLabels::new();
225 3292 : let (read, write) = labels.build_metrics();
226 3292 : let state = TrafficMetricsState {
227 3292 : labels,
228 3292 : read,
229 3292 : write,
230 3292 : };
231 3292 : Self {
232 3292 : state: Arc::new(RwLock::new(state)),
233 3292 : }
234 3292 : }
235 :
236 2 : pub fn set_client_az(&self, value: &str) {
237 2 : let mut state = self.state.write().unwrap();
238 2 : state.labels.client_az = value.to_string();
239 2 : (state.read, state.write) = state.labels.build_metrics();
240 2 : }
241 :
242 3292 : pub fn set_sk_az(&self, value: &str) {
243 3292 : let mut state = self.state.write().unwrap();
244 3292 : state.labels.sk_az = value.to_string();
245 3292 : (state.read, state.write) = state.labels.build_metrics();
246 3292 : }
247 :
248 752 : pub fn set_app_name(&self, value: &str) {
249 752 : let mut state = self.state.write().unwrap();
250 752 : state.labels.app_name = value.to_string();
251 752 : (state.read, state.write) = state.labels.build_metrics();
252 752 : }
253 :
254 2928659 : pub fn observe_read(&self, cnt: usize) {
255 2928659 : self.state.read().unwrap().read.inc_by(cnt as u64)
256 2928659 : }
257 :
258 2483186 : pub fn observe_write(&self, cnt: usize) {
259 2483186 : self.state.read().unwrap().write.inc_by(cnt as u64)
260 2483186 : }
261 : }
262 :
263 : /// Metrics for WalStorage in a single timeline.
264 665 : #[derive(Clone, Default)]
265 : pub struct WalStorageMetrics {
266 : /// How much bytes were written in total.
267 : write_wal_bytes: u64,
268 : /// How much time spent writing WAL to disk, waiting for write(2).
269 : write_wal_seconds: f64,
270 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
271 : flush_wal_seconds: f64,
272 : }
273 :
274 : impl WalStorageMetrics {
275 1668332 : pub fn observe_write_bytes(&mut self, bytes: usize) {
276 1668332 : self.write_wal_bytes += bytes as u64;
277 1668332 : WRITE_WAL_BYTES.observe(bytes as f64);
278 1668332 : }
279 :
280 1668332 : pub fn observe_write_seconds(&mut self, seconds: f64) {
281 1668332 : self.write_wal_seconds += seconds;
282 1668332 : WRITE_WAL_SECONDS.observe(seconds);
283 1668332 : }
284 :
285 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
286 0 : self.flush_wal_seconds += seconds;
287 0 : FLUSH_WAL_SECONDS.observe(seconds);
288 0 : }
289 : }
290 :
291 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
292 1668334 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
293 1668334 : closure: impl Future<Output = Result<(), E>>,
294 1668334 : ) -> Result<f64> {
295 1668334 : let start = std::time::Instant::now();
296 3129169 : closure.await.map_err(|e| e.into())?;
297 1668332 : Ok(start.elapsed().as_secs_f64())
298 1668334 : }
299 :
300 : /// Metrics for a single timeline.
301 0 : #[derive(Clone)]
302 : pub struct FullTimelineInfo {
303 : pub ttid: TenantTimelineId,
304 : pub ps_feedback: PageserverFeedback,
305 : pub wal_backup_active: bool,
306 : pub timeline_is_active: bool,
307 : pub num_computes: u32,
308 : pub last_removed_segno: XLogSegNo,
309 :
310 : pub epoch_start_lsn: Lsn,
311 : pub mem_state: TimelineMemState,
312 : pub persisted_state: TimelinePersistentState,
313 :
314 : pub flush_lsn: Lsn,
315 :
316 : pub wal_storage: WalStorageMetrics,
317 : }
318 :
319 : /// Collects metrics for all active timelines.
320 : pub struct TimelineCollector {
321 : descs: Vec<Desc>,
322 : commit_lsn: GenericGaugeVec<AtomicU64>,
323 : backup_lsn: GenericGaugeVec<AtomicU64>,
324 : flush_lsn: GenericGaugeVec<AtomicU64>,
325 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
326 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
327 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
328 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
329 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
330 : timeline_active: GenericGaugeVec<AtomicU64>,
331 : wal_backup_active: GenericGaugeVec<AtomicU64>,
332 : connected_computes: IntGaugeVec,
333 : disk_usage: GenericGaugeVec<AtomicU64>,
334 : acceptor_term: GenericGaugeVec<AtomicU64>,
335 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
336 : written_wal_seconds: GaugeVec,
337 : flushed_wal_seconds: GaugeVec,
338 : collect_timeline_metrics: Gauge,
339 : timelines_count: IntGauge,
340 : active_timelines_count: IntGauge,
341 : }
342 :
343 : impl Default for TimelineCollector {
344 0 : fn default() -> Self {
345 0 : Self::new()
346 0 : }
347 : }
348 :
349 : impl TimelineCollector {
350 510 : pub fn new() -> TimelineCollector {
351 510 : let mut descs = Vec::new();
352 510 :
353 510 : let commit_lsn = GenericGaugeVec::new(
354 510 : Opts::new(
355 510 : "safekeeper_commit_lsn",
356 510 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
357 510 : ),
358 510 : &["tenant_id", "timeline_id"],
359 510 : )
360 510 : .unwrap();
361 510 : descs.extend(commit_lsn.desc().into_iter().cloned());
362 510 :
363 510 : let backup_lsn = GenericGaugeVec::new(
364 510 : Opts::new(
365 510 : "safekeeper_backup_lsn",
366 510 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
367 510 : ),
368 510 : &["tenant_id", "timeline_id"],
369 510 : )
370 510 : .unwrap();
371 510 : descs.extend(backup_lsn.desc().into_iter().cloned());
372 510 :
373 510 : let flush_lsn = GenericGaugeVec::new(
374 510 : Opts::new(
375 510 : "safekeeper_flush_lsn",
376 510 : "Current flush_lsn, grouped by timeline",
377 510 : ),
378 510 : &["tenant_id", "timeline_id"],
379 510 : )
380 510 : .unwrap();
381 510 : descs.extend(flush_lsn.desc().into_iter().cloned());
382 510 :
383 510 : let epoch_start_lsn = GenericGaugeVec::new(
384 510 : Opts::new(
385 510 : "safekeeper_epoch_start_lsn",
386 510 : "Point since which compute generates new WAL in the current consensus term",
387 510 : ),
388 510 : &["tenant_id", "timeline_id"],
389 510 : )
390 510 : .unwrap();
391 510 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
392 510 :
393 510 : let peer_horizon_lsn = GenericGaugeVec::new(
394 510 : Opts::new(
395 510 : "safekeeper_peer_horizon_lsn",
396 510 : "LSN of the most lagging safekeeper",
397 510 : ),
398 510 : &["tenant_id", "timeline_id"],
399 510 : )
400 510 : .unwrap();
401 510 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
402 510 :
403 510 : let remote_consistent_lsn = GenericGaugeVec::new(
404 510 : Opts::new(
405 510 : "safekeeper_remote_consistent_lsn",
406 510 : "LSN which is persisted to the remote storage in pageserver",
407 510 : ),
408 510 : &["tenant_id", "timeline_id"],
409 510 : )
410 510 : .unwrap();
411 510 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
412 510 :
413 510 : let ps_last_received_lsn = GenericGaugeVec::new(
414 510 : Opts::new(
415 510 : "safekeeper_ps_last_received_lsn",
416 510 : "Last LSN received by the pageserver, acknowledged in the feedback",
417 510 : ),
418 510 : &["tenant_id", "timeline_id"],
419 510 : )
420 510 : .unwrap();
421 510 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
422 510 :
423 510 : let feedback_last_time_seconds = GenericGaugeVec::new(
424 510 : Opts::new(
425 510 : "safekeeper_feedback_last_time_seconds",
426 510 : "Timestamp of the last feedback from the pageserver",
427 510 : ),
428 510 : &["tenant_id", "timeline_id"],
429 510 : )
430 510 : .unwrap();
431 510 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
432 510 :
433 510 : let timeline_active = GenericGaugeVec::new(
434 510 : Opts::new(
435 510 : "safekeeper_timeline_active",
436 510 : "Reports 1 for active timelines, 0 for inactive",
437 510 : ),
438 510 : &["tenant_id", "timeline_id"],
439 510 : )
440 510 : .unwrap();
441 510 : descs.extend(timeline_active.desc().into_iter().cloned());
442 510 :
443 510 : let wal_backup_active = GenericGaugeVec::new(
444 510 : Opts::new(
445 510 : "safekeeper_wal_backup_active",
446 510 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
447 510 : ),
448 510 : &["tenant_id", "timeline_id"],
449 510 : )
450 510 : .unwrap();
451 510 : descs.extend(wal_backup_active.desc().into_iter().cloned());
452 510 :
453 510 : let connected_computes = IntGaugeVec::new(
454 510 : Opts::new(
455 510 : "safekeeper_connected_computes",
456 510 : "Number of active compute connections",
457 510 : ),
458 510 : &["tenant_id", "timeline_id"],
459 510 : )
460 510 : .unwrap();
461 510 : descs.extend(connected_computes.desc().into_iter().cloned());
462 510 :
463 510 : let disk_usage = GenericGaugeVec::new(
464 510 : Opts::new(
465 510 : "safekeeper_disk_usage_bytes",
466 510 : "Estimated disk space used to store WAL segments",
467 510 : ),
468 510 : &["tenant_id", "timeline_id"],
469 510 : )
470 510 : .unwrap();
471 510 : descs.extend(disk_usage.desc().into_iter().cloned());
472 510 :
473 510 : let acceptor_term = GenericGaugeVec::new(
474 510 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
475 510 : &["tenant_id", "timeline_id"],
476 510 : )
477 510 : .unwrap();
478 510 : descs.extend(acceptor_term.desc().into_iter().cloned());
479 510 :
480 510 : let written_wal_bytes = GenericGaugeVec::new(
481 510 : Opts::new(
482 510 : "safekeeper_written_wal_bytes_total",
483 510 : "Number of WAL bytes written to disk, grouped by timeline",
484 510 : ),
485 510 : &["tenant_id", "timeline_id"],
486 510 : )
487 510 : .unwrap();
488 510 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
489 510 :
490 510 : let written_wal_seconds = GaugeVec::new(
491 510 : Opts::new(
492 510 : "safekeeper_written_wal_seconds_total",
493 510 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
494 510 : ),
495 510 : &["tenant_id", "timeline_id"],
496 510 : )
497 510 : .unwrap();
498 510 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
499 510 :
500 510 : let flushed_wal_seconds = GaugeVec::new(
501 510 : Opts::new(
502 510 : "safekeeper_flushed_wal_seconds_total",
503 510 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
504 510 : ),
505 510 : &["tenant_id", "timeline_id"],
506 510 : )
507 510 : .unwrap();
508 510 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
509 510 :
510 510 : let collect_timeline_metrics = Gauge::new(
511 510 : "safekeeper_collect_timeline_metrics_seconds",
512 510 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
513 510 : )
514 510 : .unwrap();
515 510 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
516 510 :
517 510 : let timelines_count = IntGauge::new(
518 510 : "safekeeper_timelines",
519 510 : "Total number of timelines loaded in-memory",
520 510 : )
521 510 : .unwrap();
522 510 : descs.extend(timelines_count.desc().into_iter().cloned());
523 510 :
524 510 : let active_timelines_count = IntGauge::new(
525 510 : "safekeeper_active_timelines",
526 510 : "Total number of active timelines",
527 510 : )
528 510 : .unwrap();
529 510 : descs.extend(active_timelines_count.desc().into_iter().cloned());
530 510 :
531 510 : TimelineCollector {
532 510 : descs,
533 510 : commit_lsn,
534 510 : backup_lsn,
535 510 : flush_lsn,
536 510 : epoch_start_lsn,
537 510 : peer_horizon_lsn,
538 510 : remote_consistent_lsn,
539 510 : ps_last_received_lsn,
540 510 : feedback_last_time_seconds,
541 510 : timeline_active,
542 510 : wal_backup_active,
543 510 : connected_computes,
544 510 : disk_usage,
545 510 : acceptor_term,
546 510 : written_wal_bytes,
547 510 : written_wal_seconds,
548 510 : flushed_wal_seconds,
549 510 : collect_timeline_metrics,
550 510 : timelines_count,
551 510 : active_timelines_count,
552 510 : }
553 510 : }
554 : }
555 :
556 : impl Collector for TimelineCollector {
557 510 : fn desc(&self) -> Vec<&Desc> {
558 510 : self.descs.iter().collect()
559 510 : }
560 :
561 19 : fn collect(&self) -> Vec<MetricFamily> {
562 19 : let start_collecting = Instant::now();
563 19 :
564 19 : // reset all metrics to clean up inactive timelines
565 19 : self.commit_lsn.reset();
566 19 : self.backup_lsn.reset();
567 19 : self.flush_lsn.reset();
568 19 : self.epoch_start_lsn.reset();
569 19 : self.peer_horizon_lsn.reset();
570 19 : self.remote_consistent_lsn.reset();
571 19 : self.ps_last_received_lsn.reset();
572 19 : self.feedback_last_time_seconds.reset();
573 19 : self.timeline_active.reset();
574 19 : self.wal_backup_active.reset();
575 19 : self.connected_computes.reset();
576 19 : self.disk_usage.reset();
577 19 : self.acceptor_term.reset();
578 19 : self.written_wal_bytes.reset();
579 19 : self.written_wal_seconds.reset();
580 19 : self.flushed_wal_seconds.reset();
581 19 :
582 19 : let timelines = GlobalTimelines::get_all();
583 19 : let timelines_count = timelines.len();
584 19 : let mut active_timelines_count = 0;
585 19 :
586 19 : // Prometheus Collector is sync, and data is stored under async lock. To
587 19 : // bridge the gap with a crutch, collect data in spawned thread with
588 19 : // local tokio runtime.
589 19 : let infos = std::thread::spawn(|| {
590 19 : let rt = tokio::runtime::Builder::new_current_thread()
591 19 : .build()
592 19 : .expect("failed to create rt");
593 19 : rt.block_on(collect_timeline_metrics())
594 19 : })
595 19 : .join()
596 19 : .expect("collect_timeline_metrics thread panicked");
597 :
598 70 : for tli in &infos {
599 51 : let tenant_id = tli.ttid.tenant_id.to_string();
600 51 : let timeline_id = tli.ttid.timeline_id.to_string();
601 51 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
602 51 :
603 51 : if tli.timeline_is_active {
604 51 : active_timelines_count += 1;
605 51 : }
606 :
607 51 : self.commit_lsn
608 51 : .with_label_values(labels)
609 51 : .set(tli.mem_state.commit_lsn.into());
610 51 : self.backup_lsn
611 51 : .with_label_values(labels)
612 51 : .set(tli.mem_state.backup_lsn.into());
613 51 : self.flush_lsn
614 51 : .with_label_values(labels)
615 51 : .set(tli.flush_lsn.into());
616 51 : self.epoch_start_lsn
617 51 : .with_label_values(labels)
618 51 : .set(tli.epoch_start_lsn.into());
619 51 : self.peer_horizon_lsn
620 51 : .with_label_values(labels)
621 51 : .set(tli.mem_state.peer_horizon_lsn.into());
622 51 : self.remote_consistent_lsn
623 51 : .with_label_values(labels)
624 51 : .set(tli.mem_state.remote_consistent_lsn.into());
625 51 : self.timeline_active
626 51 : .with_label_values(labels)
627 51 : .set(tli.timeline_is_active as u64);
628 51 : self.wal_backup_active
629 51 : .with_label_values(labels)
630 51 : .set(tli.wal_backup_active as u64);
631 51 : self.connected_computes
632 51 : .with_label_values(labels)
633 51 : .set(tli.num_computes as i64);
634 51 : self.acceptor_term
635 51 : .with_label_values(labels)
636 51 : .set(tli.persisted_state.acceptor_state.term);
637 51 : self.written_wal_bytes
638 51 : .with_label_values(labels)
639 51 : .set(tli.wal_storage.write_wal_bytes);
640 51 : self.written_wal_seconds
641 51 : .with_label_values(labels)
642 51 : .set(tli.wal_storage.write_wal_seconds);
643 51 : self.flushed_wal_seconds
644 51 : .with_label_values(labels)
645 51 : .set(tli.wal_storage.flush_wal_seconds);
646 51 :
647 51 : self.ps_last_received_lsn
648 51 : .with_label_values(labels)
649 51 : .set(tli.ps_feedback.last_received_lsn.0);
650 51 : if let Ok(unix_time) = tli
651 51 : .ps_feedback
652 51 : .replytime
653 51 : .duration_since(SystemTime::UNIX_EPOCH)
654 51 : {
655 51 : self.feedback_last_time_seconds
656 51 : .with_label_values(labels)
657 51 : .set(unix_time.as_secs());
658 51 : }
659 :
660 51 : if tli.last_removed_segno != 0 {
661 0 : let segno_count = tli
662 0 : .flush_lsn
663 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
664 0 : - tli.last_removed_segno;
665 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
666 0 : self.disk_usage
667 0 : .with_label_values(labels)
668 0 : .set(disk_usage_bytes);
669 51 : }
670 : }
671 :
672 : // collect MetricFamilys.
673 19 : let mut mfs = Vec::new();
674 19 : mfs.extend(self.commit_lsn.collect());
675 19 : mfs.extend(self.backup_lsn.collect());
676 19 : mfs.extend(self.flush_lsn.collect());
677 19 : mfs.extend(self.epoch_start_lsn.collect());
678 19 : mfs.extend(self.peer_horizon_lsn.collect());
679 19 : mfs.extend(self.remote_consistent_lsn.collect());
680 19 : mfs.extend(self.ps_last_received_lsn.collect());
681 19 : mfs.extend(self.feedback_last_time_seconds.collect());
682 19 : mfs.extend(self.timeline_active.collect());
683 19 : mfs.extend(self.wal_backup_active.collect());
684 19 : mfs.extend(self.connected_computes.collect());
685 19 : mfs.extend(self.disk_usage.collect());
686 19 : mfs.extend(self.acceptor_term.collect());
687 19 : mfs.extend(self.written_wal_bytes.collect());
688 19 : mfs.extend(self.written_wal_seconds.collect());
689 19 : mfs.extend(self.flushed_wal_seconds.collect());
690 19 :
691 19 : // report time it took to collect all info
692 19 : let elapsed = start_collecting.elapsed().as_secs_f64();
693 19 : self.collect_timeline_metrics.set(elapsed);
694 19 : mfs.extend(self.collect_timeline_metrics.collect());
695 19 :
696 19 : // report total number of timelines
697 19 : self.timelines_count.set(timelines_count as i64);
698 19 : self.active_timelines_count
699 19 : .set(active_timelines_count as i64);
700 19 : mfs.extend(self.timelines_count.collect());
701 19 :
702 19 : mfs
703 19 : }
704 : }
705 :
706 19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
707 19 : let mut res = vec![];
708 19 : let timelines = GlobalTimelines::get_all();
709 :
710 70 : for tli in timelines {
711 51 : if let Some(info) = tli.info_for_metrics().await {
712 51 : res.push(info);
713 51 : }
714 : }
715 19 : res
716 19 : }
|