Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
15 : };
16 : use once_cell::sync::Lazy;
17 :
18 : use postgres_ffi::XLogSegNo;
19 : use utils::pageserver_feedback::PageserverFeedback;
20 : use utils::{id::TenantTimelineId, lsn::Lsn};
21 :
22 : use crate::{
23 : safekeeper::{SafeKeeperState, SafekeeperMemState},
24 : GlobalTimelines,
25 : };
26 :
27 : // Global metrics across all timelines.
28 436 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 436 : register_histogram!(
30 436 : "safekeeper_write_wal_bytes",
31 436 : "Bytes written to WAL in a single request",
32 436 : vec![
33 436 : 1.0,
34 436 : 10.0,
35 436 : 100.0,
36 436 : 1024.0,
37 436 : 8192.0,
38 436 : 128.0 * 1024.0,
39 436 : 1024.0 * 1024.0,
40 436 : 10.0 * 1024.0 * 1024.0
41 436 : ]
42 436 : )
43 436 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 436 : });
45 436 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 436 : register_histogram!(
47 436 : "safekeeper_write_wal_seconds",
48 436 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 436 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
50 436 : )
51 436 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 436 : });
53 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 0 : register_histogram!(
55 0 : "safekeeper_flush_wal_seconds",
56 0 : "Seconds spent syncing WAL to a disk",
57 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
58 0 : )
59 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 0 : });
61 462 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
62 462 : register_histogram!(
63 462 : "safekeeper_persist_control_file_seconds",
64 462 : "Seconds to persist and sync control file",
65 462 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
66 462 : )
67 462 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
68 462 : });
69 463 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
70 463 : register_int_counter_vec!(
71 463 : "safekeeper_pg_io_bytes_total",
72 463 : "Bytes read from or written to any PostgreSQL connection",
73 463 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
74 463 : )
75 463 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
76 463 : });
77 447 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
78 447 : register_int_counter!(
79 447 : "safekeeper_broker_pushed_updates_total",
80 447 : "Number of timeline updates pushed to the broker"
81 447 : )
82 447 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
83 447 : });
84 517 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
85 517 : register_int_counter_vec!(
86 517 : "safekeeper_broker_pulled_updates_total",
87 517 : "Number of timeline updates pulled and processed from the broker",
88 517 : &["result"]
89 517 : )
90 517 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
91 517 : });
92 463 : pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
93 463 : register_int_counter_vec!(
94 463 : "safekeeper_pg_queries_received_total",
95 463 : "Number of queries received through pg protocol",
96 463 : &["query"]
97 463 : )
98 463 : .expect("Failed to register safekeeper_pg_queries_received_total counter")
99 463 : });
100 456 : pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
101 456 : register_int_counter_vec!(
102 456 : "safekeeper_pg_queries_finished_total",
103 456 : "Number of queries finished through pg protocol",
104 456 : &["query"]
105 456 : )
106 456 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
107 456 : });
108 13 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
109 13 : register_int_counter!(
110 13 : "safekeeper_removed_wal_segments_total",
111 13 : "Number of WAL segments removed from the disk"
112 13 : )
113 13 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
114 13 : });
115 13 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
116 13 : register_int_counter!(
117 13 : "safekeeper_backed_up_segments_total",
118 13 : "Number of WAL segments backed up to the broker"
119 13 : )
120 13 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
121 13 : });
122 2 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
123 2 : register_int_counter!(
124 2 : "safekeeper_backup_errors_total",
125 2 : "Number of errors during backup"
126 2 : )
127 2 : .expect("Failed to register safekeeper_backup_errors_total counter")
128 2 : });
129 517 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
130 517 : register_histogram!(
131 517 : "safekeeper_broker_push_update_seconds",
132 517 : "Seconds to push all timeline updates to the broker",
133 517 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
134 517 : )
135 517 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
136 517 : });
137 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
138 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
139 : ];
140 517 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
141 517 : register_histogram!(
142 517 : "safekeeper_broker_iteration_timelines",
143 517 : "Count of timelines pushed to the broker in a single iteration",
144 517 : TIMELINES_COUNT_BUCKETS.to_vec()
145 517 : )
146 517 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
147 517 : });
148 :
149 : pub const LABEL_UNKNOWN: &str = "unknown";
150 :
151 : /// Labels for traffic metrics.
152 0 : #[derive(Clone)]
153 : struct ConnectionLabels {
154 : /// Availability zone of the connection origin.
155 : client_az: String,
156 : /// Availability zone of the current safekeeper.
157 : sk_az: String,
158 : /// Client application name.
159 : app_name: String,
160 : }
161 :
162 : impl ConnectionLabels {
163 3727 : fn new() -> Self {
164 3727 : Self {
165 3727 : client_az: LABEL_UNKNOWN.to_string(),
166 3727 : sk_az: LABEL_UNKNOWN.to_string(),
167 3727 : app_name: LABEL_UNKNOWN.to_string(),
168 3727 : }
169 3727 : }
170 :
171 8287 : fn build_metrics(
172 8287 : &self,
173 8287 : ) -> (
174 8287 : GenericCounter<metrics::core::AtomicU64>,
175 8287 : GenericCounter<metrics::core::AtomicU64>,
176 8287 : ) {
177 8287 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
178 8287 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
179 8 : (client_az, sk_az) => {
180 8 : if client_az == sk_az {
181 0 : "true"
182 : } else {
183 8 : "false"
184 : }
185 : }
186 : };
187 :
188 8287 : let read = PG_IO_BYTES.with_label_values(&[
189 8287 : &self.client_az,
190 8287 : &self.sk_az,
191 8287 : &self.app_name,
192 8287 : "read",
193 8287 : same_az,
194 8287 : ]);
195 8287 : let write = PG_IO_BYTES.with_label_values(&[
196 8287 : &self.client_az,
197 8287 : &self.sk_az,
198 8287 : &self.app_name,
199 8287 : "write",
200 8287 : same_az,
201 8287 : ]);
202 8287 : (read, write)
203 8287 : }
204 : }
205 :
206 : struct TrafficMetricsState {
207 : /// Labels for traffic metrics.
208 : labels: ConnectionLabels,
209 : /// Total bytes read from this connection.
210 : read: GenericCounter<metrics::core::AtomicU64>,
211 : /// Total bytes written to this connection.
212 : write: GenericCounter<metrics::core::AtomicU64>,
213 : }
214 :
215 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
216 3727 : #[derive(Clone)]
217 : pub struct TrafficMetrics {
218 : state: Arc<RwLock<TrafficMetricsState>>,
219 : }
220 :
221 : impl Default for TrafficMetrics {
222 0 : fn default() -> Self {
223 0 : Self::new()
224 0 : }
225 : }
226 :
227 : impl TrafficMetrics {
228 3727 : pub fn new() -> Self {
229 3727 : let labels = ConnectionLabels::new();
230 3727 : let (read, write) = labels.build_metrics();
231 3727 : let state = TrafficMetricsState {
232 3727 : labels,
233 3727 : read,
234 3727 : write,
235 3727 : };
236 3727 : Self {
237 3727 : state: Arc::new(RwLock::new(state)),
238 3727 : }
239 3727 : }
240 :
241 4 : pub fn set_client_az(&self, value: &str) {
242 4 : let mut state = self.state.write().unwrap();
243 4 : state.labels.client_az = value.to_string();
244 4 : (state.read, state.write) = state.labels.build_metrics();
245 4 : }
246 :
247 3727 : pub fn set_sk_az(&self, value: &str) {
248 3727 : let mut state = self.state.write().unwrap();
249 3727 : state.labels.sk_az = value.to_string();
250 3727 : (state.read, state.write) = state.labels.build_metrics();
251 3727 : }
252 :
253 829 : pub fn set_app_name(&self, value: &str) {
254 829 : let mut state = self.state.write().unwrap();
255 829 : state.labels.app_name = value.to_string();
256 829 : (state.read, state.write) = state.labels.build_metrics();
257 829 : }
258 :
259 3228652 : pub fn observe_read(&self, cnt: usize) {
260 3228652 : self.state.read().unwrap().read.inc_by(cnt as u64)
261 3228652 : }
262 :
263 2967391 : pub fn observe_write(&self, cnt: usize) {
264 2967391 : self.state.read().unwrap().write.inc_by(cnt as u64)
265 2967391 : }
266 : }
267 :
268 : /// Metrics for WalStorage in a single timeline.
269 605 : #[derive(Clone, Default)]
270 : pub struct WalStorageMetrics {
271 : /// How much bytes were written in total.
272 : write_wal_bytes: u64,
273 : /// How much time spent writing WAL to disk, waiting for write(2).
274 : write_wal_seconds: f64,
275 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
276 : flush_wal_seconds: f64,
277 : }
278 :
279 : impl WalStorageMetrics {
280 1460402 : pub fn observe_write_bytes(&mut self, bytes: usize) {
281 1460402 : self.write_wal_bytes += bytes as u64;
282 1460402 : WRITE_WAL_BYTES.observe(bytes as f64);
283 1460402 : }
284 :
285 1460402 : pub fn observe_write_seconds(&mut self, seconds: f64) {
286 1460402 : self.write_wal_seconds += seconds;
287 1460402 : WRITE_WAL_SECONDS.observe(seconds);
288 1460402 : }
289 :
290 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
291 0 : self.flush_wal_seconds += seconds;
292 0 : FLUSH_WAL_SECONDS.observe(seconds);
293 0 : }
294 : }
295 :
296 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
297 1460402 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
298 1460402 : closure: impl Future<Output = Result<(), E>>,
299 1460402 : ) -> Result<f64> {
300 1460402 : let start = std::time::Instant::now();
301 3041998 : closure.await.map_err(|e| e.into())?;
302 1460402 : Ok(start.elapsed().as_secs_f64())
303 1460402 : }
304 :
305 : /// Metrics for a single timeline.
306 0 : #[derive(Clone)]
307 : pub struct FullTimelineInfo {
308 : pub ttid: TenantTimelineId,
309 : pub ps_feedback: PageserverFeedback,
310 : pub wal_backup_active: bool,
311 : pub timeline_is_active: bool,
312 : pub num_computes: u32,
313 : pub last_removed_segno: XLogSegNo,
314 :
315 : pub epoch_start_lsn: Lsn,
316 : pub mem_state: SafekeeperMemState,
317 : pub persisted_state: SafeKeeperState,
318 :
319 : pub flush_lsn: Lsn,
320 : pub remote_consistent_lsn: Lsn,
321 :
322 : pub wal_storage: WalStorageMetrics,
323 : }
324 :
325 : /// Collects metrics for all active timelines.
326 : pub struct TimelineCollector {
327 : descs: Vec<Desc>,
328 : commit_lsn: GenericGaugeVec<AtomicU64>,
329 : backup_lsn: GenericGaugeVec<AtomicU64>,
330 : flush_lsn: GenericGaugeVec<AtomicU64>,
331 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
332 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
333 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
334 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
335 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
336 : timeline_active: GenericGaugeVec<AtomicU64>,
337 : wal_backup_active: GenericGaugeVec<AtomicU64>,
338 : connected_computes: IntGaugeVec,
339 : disk_usage: GenericGaugeVec<AtomicU64>,
340 : acceptor_term: GenericGaugeVec<AtomicU64>,
341 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
342 : written_wal_seconds: GaugeVec,
343 : flushed_wal_seconds: GaugeVec,
344 : collect_timeline_metrics: Gauge,
345 : timelines_count: IntGauge,
346 : }
347 :
348 : impl Default for TimelineCollector {
349 0 : fn default() -> Self {
350 0 : Self::new()
351 0 : }
352 : }
353 :
354 : impl TimelineCollector {
355 517 : pub fn new() -> TimelineCollector {
356 517 : let mut descs = Vec::new();
357 517 :
358 517 : let commit_lsn = GenericGaugeVec::new(
359 517 : Opts::new(
360 517 : "safekeeper_commit_lsn",
361 517 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
362 517 : ),
363 517 : &["tenant_id", "timeline_id"],
364 517 : )
365 517 : .unwrap();
366 517 : descs.extend(commit_lsn.desc().into_iter().cloned());
367 517 :
368 517 : let backup_lsn = GenericGaugeVec::new(
369 517 : Opts::new(
370 517 : "safekeeper_backup_lsn",
371 517 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
372 517 : ),
373 517 : &["tenant_id", "timeline_id"],
374 517 : )
375 517 : .unwrap();
376 517 : descs.extend(backup_lsn.desc().into_iter().cloned());
377 517 :
378 517 : let flush_lsn = GenericGaugeVec::new(
379 517 : Opts::new(
380 517 : "safekeeper_flush_lsn",
381 517 : "Current flush_lsn, grouped by timeline",
382 517 : ),
383 517 : &["tenant_id", "timeline_id"],
384 517 : )
385 517 : .unwrap();
386 517 : descs.extend(flush_lsn.desc().into_iter().cloned());
387 517 :
388 517 : let epoch_start_lsn = GenericGaugeVec::new(
389 517 : Opts::new(
390 517 : "safekeeper_epoch_start_lsn",
391 517 : "Point since which compute generates new WAL in the current consensus term",
392 517 : ),
393 517 : &["tenant_id", "timeline_id"],
394 517 : )
395 517 : .unwrap();
396 517 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
397 517 :
398 517 : let peer_horizon_lsn = GenericGaugeVec::new(
399 517 : Opts::new(
400 517 : "safekeeper_peer_horizon_lsn",
401 517 : "LSN of the most lagging safekeeper",
402 517 : ),
403 517 : &["tenant_id", "timeline_id"],
404 517 : )
405 517 : .unwrap();
406 517 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
407 517 :
408 517 : let remote_consistent_lsn = GenericGaugeVec::new(
409 517 : Opts::new(
410 517 : "safekeeper_remote_consistent_lsn",
411 517 : "LSN which is persisted to the remote storage in pageserver",
412 517 : ),
413 517 : &["tenant_id", "timeline_id"],
414 517 : )
415 517 : .unwrap();
416 517 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
417 517 :
418 517 : let ps_last_received_lsn = GenericGaugeVec::new(
419 517 : Opts::new(
420 517 : "safekeeper_ps_last_received_lsn",
421 517 : "Last LSN received by the pageserver, acknowledged in the feedback",
422 517 : ),
423 517 : &["tenant_id", "timeline_id"],
424 517 : )
425 517 : .unwrap();
426 517 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
427 517 :
428 517 : let feedback_last_time_seconds = GenericGaugeVec::new(
429 517 : Opts::new(
430 517 : "safekeeper_feedback_last_time_seconds",
431 517 : "Timestamp of the last feedback from the pageserver",
432 517 : ),
433 517 : &["tenant_id", "timeline_id"],
434 517 : )
435 517 : .unwrap();
436 517 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
437 517 :
438 517 : let timeline_active = GenericGaugeVec::new(
439 517 : Opts::new(
440 517 : "safekeeper_timeline_active",
441 517 : "Reports 1 for active timelines, 0 for inactive",
442 517 : ),
443 517 : &["tenant_id", "timeline_id"],
444 517 : )
445 517 : .unwrap();
446 517 : descs.extend(timeline_active.desc().into_iter().cloned());
447 517 :
448 517 : let wal_backup_active = GenericGaugeVec::new(
449 517 : Opts::new(
450 517 : "safekeeper_wal_backup_active",
451 517 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
452 517 : ),
453 517 : &["tenant_id", "timeline_id"],
454 517 : )
455 517 : .unwrap();
456 517 : descs.extend(wal_backup_active.desc().into_iter().cloned());
457 517 :
458 517 : let connected_computes = IntGaugeVec::new(
459 517 : Opts::new(
460 517 : "safekeeper_connected_computes",
461 517 : "Number of active compute connections",
462 517 : ),
463 517 : &["tenant_id", "timeline_id"],
464 517 : )
465 517 : .unwrap();
466 517 : descs.extend(connected_computes.desc().into_iter().cloned());
467 517 :
468 517 : let disk_usage = GenericGaugeVec::new(
469 517 : Opts::new(
470 517 : "safekeeper_disk_usage_bytes",
471 517 : "Estimated disk space used to store WAL segments",
472 517 : ),
473 517 : &["tenant_id", "timeline_id"],
474 517 : )
475 517 : .unwrap();
476 517 : descs.extend(disk_usage.desc().into_iter().cloned());
477 517 :
478 517 : let acceptor_term = GenericGaugeVec::new(
479 517 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
480 517 : &["tenant_id", "timeline_id"],
481 517 : )
482 517 : .unwrap();
483 517 : descs.extend(acceptor_term.desc().into_iter().cloned());
484 517 :
485 517 : let written_wal_bytes = GenericGaugeVec::new(
486 517 : Opts::new(
487 517 : "safekeeper_written_wal_bytes_total",
488 517 : "Number of WAL bytes written to disk, grouped by timeline",
489 517 : ),
490 517 : &["tenant_id", "timeline_id"],
491 517 : )
492 517 : .unwrap();
493 517 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
494 517 :
495 517 : let written_wal_seconds = GaugeVec::new(
496 517 : Opts::new(
497 517 : "safekeeper_written_wal_seconds_total",
498 517 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
499 517 : ),
500 517 : &["tenant_id", "timeline_id"],
501 517 : )
502 517 : .unwrap();
503 517 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
504 517 :
505 517 : let flushed_wal_seconds = GaugeVec::new(
506 517 : Opts::new(
507 517 : "safekeeper_flushed_wal_seconds_total",
508 517 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
509 517 : ),
510 517 : &["tenant_id", "timeline_id"],
511 517 : )
512 517 : .unwrap();
513 517 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
514 517 :
515 517 : let collect_timeline_metrics = Gauge::new(
516 517 : "safekeeper_collect_timeline_metrics_seconds",
517 517 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
518 517 : )
519 517 : .unwrap();
520 517 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
521 517 :
522 517 : let timelines_count = IntGauge::new(
523 517 : "safekeeper_timelines",
524 517 : "Total number of timelines loaded in-memory",
525 517 : )
526 517 : .unwrap();
527 517 : descs.extend(timelines_count.desc().into_iter().cloned());
528 517 :
529 517 : TimelineCollector {
530 517 : descs,
531 517 : commit_lsn,
532 517 : backup_lsn,
533 517 : flush_lsn,
534 517 : epoch_start_lsn,
535 517 : peer_horizon_lsn,
536 517 : remote_consistent_lsn,
537 517 : ps_last_received_lsn,
538 517 : feedback_last_time_seconds,
539 517 : timeline_active,
540 517 : wal_backup_active,
541 517 : connected_computes,
542 517 : disk_usage,
543 517 : acceptor_term,
544 517 : written_wal_bytes,
545 517 : written_wal_seconds,
546 517 : flushed_wal_seconds,
547 517 : collect_timeline_metrics,
548 517 : timelines_count,
549 517 : }
550 517 : }
551 : }
552 :
553 : impl Collector for TimelineCollector {
554 517 : fn desc(&self) -> Vec<&Desc> {
555 517 : self.descs.iter().collect()
556 517 : }
557 :
558 19 : fn collect(&self) -> Vec<MetricFamily> {
559 19 : let start_collecting = Instant::now();
560 19 :
561 19 : // reset all metrics to clean up inactive timelines
562 19 : self.commit_lsn.reset();
563 19 : self.backup_lsn.reset();
564 19 : self.flush_lsn.reset();
565 19 : self.epoch_start_lsn.reset();
566 19 : self.peer_horizon_lsn.reset();
567 19 : self.remote_consistent_lsn.reset();
568 19 : self.ps_last_received_lsn.reset();
569 19 : self.feedback_last_time_seconds.reset();
570 19 : self.timeline_active.reset();
571 19 : self.wal_backup_active.reset();
572 19 : self.connected_computes.reset();
573 19 : self.disk_usage.reset();
574 19 : self.acceptor_term.reset();
575 19 : self.written_wal_bytes.reset();
576 19 : self.written_wal_seconds.reset();
577 19 : self.flushed_wal_seconds.reset();
578 19 :
579 19 : let timelines = GlobalTimelines::get_all();
580 19 : let timelines_count = timelines.len();
581 19 :
582 19 : // Prometheus Collector is sync, and data is stored under async lock. To
583 19 : // bridge the gap with a crutch, collect data in spawned thread with
584 19 : // local tokio runtime.
585 19 : let infos = std::thread::spawn(|| {
586 19 : let rt = tokio::runtime::Builder::new_current_thread()
587 19 : .build()
588 19 : .expect("failed to create rt");
589 19 : rt.block_on(collect_timeline_metrics())
590 19 : })
591 19 : .join()
592 19 : .expect("collect_timeline_metrics thread panicked");
593 :
594 70 : for tli in &infos {
595 51 : let tenant_id = tli.ttid.tenant_id.to_string();
596 51 : let timeline_id = tli.ttid.timeline_id.to_string();
597 51 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
598 51 :
599 51 : self.commit_lsn
600 51 : .with_label_values(labels)
601 51 : .set(tli.mem_state.commit_lsn.into());
602 51 : self.backup_lsn
603 51 : .with_label_values(labels)
604 51 : .set(tli.mem_state.backup_lsn.into());
605 51 : self.flush_lsn
606 51 : .with_label_values(labels)
607 51 : .set(tli.flush_lsn.into());
608 51 : self.epoch_start_lsn
609 51 : .with_label_values(labels)
610 51 : .set(tli.epoch_start_lsn.into());
611 51 : self.peer_horizon_lsn
612 51 : .with_label_values(labels)
613 51 : .set(tli.mem_state.peer_horizon_lsn.into());
614 51 : self.remote_consistent_lsn
615 51 : .with_label_values(labels)
616 51 : .set(tli.remote_consistent_lsn.into());
617 51 : self.timeline_active
618 51 : .with_label_values(labels)
619 51 : .set(tli.timeline_is_active as u64);
620 51 : self.wal_backup_active
621 51 : .with_label_values(labels)
622 51 : .set(tli.wal_backup_active as u64);
623 51 : self.connected_computes
624 51 : .with_label_values(labels)
625 51 : .set(tli.num_computes as i64);
626 51 : self.acceptor_term
627 51 : .with_label_values(labels)
628 51 : .set(tli.persisted_state.acceptor_state.term);
629 51 : self.written_wal_bytes
630 51 : .with_label_values(labels)
631 51 : .set(tli.wal_storage.write_wal_bytes);
632 51 : self.written_wal_seconds
633 51 : .with_label_values(labels)
634 51 : .set(tli.wal_storage.write_wal_seconds);
635 51 : self.flushed_wal_seconds
636 51 : .with_label_values(labels)
637 51 : .set(tli.wal_storage.flush_wal_seconds);
638 51 :
639 51 : self.ps_last_received_lsn
640 51 : .with_label_values(labels)
641 51 : .set(tli.ps_feedback.last_received_lsn.0);
642 51 : if let Ok(unix_time) = tli
643 51 : .ps_feedback
644 51 : .replytime
645 51 : .duration_since(SystemTime::UNIX_EPOCH)
646 51 : {
647 51 : self.feedback_last_time_seconds
648 51 : .with_label_values(labels)
649 51 : .set(unix_time.as_secs());
650 51 : }
651 :
652 51 : if tli.last_removed_segno != 0 {
653 0 : let segno_count = tli
654 0 : .flush_lsn
655 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
656 0 : - tli.last_removed_segno;
657 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
658 0 : self.disk_usage
659 0 : .with_label_values(labels)
660 0 : .set(disk_usage_bytes);
661 51 : }
662 : }
663 :
664 : // collect MetricFamilys.
665 19 : let mut mfs = Vec::new();
666 19 : mfs.extend(self.commit_lsn.collect());
667 19 : mfs.extend(self.backup_lsn.collect());
668 19 : mfs.extend(self.flush_lsn.collect());
669 19 : mfs.extend(self.epoch_start_lsn.collect());
670 19 : mfs.extend(self.peer_horizon_lsn.collect());
671 19 : mfs.extend(self.remote_consistent_lsn.collect());
672 19 : mfs.extend(self.ps_last_received_lsn.collect());
673 19 : mfs.extend(self.feedback_last_time_seconds.collect());
674 19 : mfs.extend(self.timeline_active.collect());
675 19 : mfs.extend(self.wal_backup_active.collect());
676 19 : mfs.extend(self.connected_computes.collect());
677 19 : mfs.extend(self.disk_usage.collect());
678 19 : mfs.extend(self.acceptor_term.collect());
679 19 : mfs.extend(self.written_wal_bytes.collect());
680 19 : mfs.extend(self.written_wal_seconds.collect());
681 19 : mfs.extend(self.flushed_wal_seconds.collect());
682 19 :
683 19 : // report time it took to collect all info
684 19 : let elapsed = start_collecting.elapsed().as_secs_f64();
685 19 : self.collect_timeline_metrics.set(elapsed);
686 19 : mfs.extend(self.collect_timeline_metrics.collect());
687 19 :
688 19 : // report total number of timelines
689 19 : self.timelines_count.set(timelines_count as i64);
690 19 : mfs.extend(self.timelines_count.collect());
691 19 :
692 19 : mfs
693 19 : }
694 : }
695 :
696 19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
697 19 : let mut res = vec![];
698 19 : let timelines = GlobalTimelines::get_all();
699 :
700 70 : for tli in timelines {
701 51 : if let Some(info) = tli.info_for_metrics().await {
702 51 : res.push(info);
703 51 : }
704 : }
705 19 : res
706 19 : }
|