TLA Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
15 : };
16 : use once_cell::sync::Lazy;
17 :
18 : use postgres_ffi::XLogSegNo;
19 : use utils::pageserver_feedback::PageserverFeedback;
20 : use utils::{id::TenantTimelineId, lsn::Lsn};
21 :
22 : use crate::{
23 : safekeeper::{SafeKeeperState, SafekeeperMemState},
24 : GlobalTimelines,
25 : };
26 :
27 : // Global metrics across all timelines.
28 CBC 435 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 435 : register_histogram!(
30 435 : "safekeeper_write_wal_bytes",
31 435 : "Bytes written to WAL in a single request",
32 435 : vec![
33 435 : 1.0,
34 435 : 10.0,
35 435 : 100.0,
36 435 : 1024.0,
37 435 : 8192.0,
38 435 : 128.0 * 1024.0,
39 435 : 1024.0 * 1024.0,
40 435 : 10.0 * 1024.0 * 1024.0
41 435 : ]
42 435 : )
43 435 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 435 : });
45 435 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 435 : register_histogram!(
47 435 : "safekeeper_write_wal_seconds",
48 435 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 435 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
50 435 : )
51 435 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 435 : });
53 UBC 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 0 : register_histogram!(
55 0 : "safekeeper_flush_wal_seconds",
56 0 : "Seconds spent syncing WAL to a disk",
57 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
58 0 : )
59 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 0 : });
61 CBC 439 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
62 439 : register_histogram!(
63 439 : "safekeeper_persist_control_file_seconds",
64 439 : "Seconds to persist and sync control file",
65 439 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
66 439 : )
67 439 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
68 439 : });
69 440 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
70 440 : register_int_counter_vec!(
71 440 : "safekeeper_pg_io_bytes_total",
72 440 : "Bytes read from or written to any PostgreSQL connection",
73 440 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
74 440 : )
75 440 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
76 440 : });
77 420 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
78 420 : register_int_counter!(
79 420 : "safekeeper_broker_pushed_updates_total",
80 420 : "Number of timeline updates pushed to the broker"
81 420 : )
82 420 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
83 420 : });
84 500 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
85 500 : register_int_counter_vec!(
86 500 : "safekeeper_broker_pulled_updates_total",
87 500 : "Number of timeline updates pulled and processed from the broker",
88 500 : &["result"]
89 500 : )
90 500 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
91 500 : });
92 440 : pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
93 440 : register_int_counter_vec!(
94 440 : "safekeeper_pg_queries_received_total",
95 440 : "Number of queries received through pg protocol",
96 440 : &["query"]
97 440 : )
98 440 : .expect("Failed to register safekeeper_pg_queries_received_total counter")
99 440 : });
100 434 : pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
101 434 : register_int_counter_vec!(
102 434 : "safekeeper_pg_queries_finished_total",
103 434 : "Number of queries finished through pg protocol",
104 434 : &["query"]
105 434 : )
106 434 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
107 434 : });
108 13 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
109 13 : register_int_counter!(
110 13 : "safekeeper_removed_wal_segments_total",
111 13 : "Number of WAL segments removed from the disk"
112 13 : )
113 13 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
114 13 : });
115 15 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
116 15 : register_int_counter!(
117 15 : "safekeeper_backed_up_segments_total",
118 15 : "Number of WAL segments backed up to the broker"
119 15 : )
120 15 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
121 15 : });
122 1 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
123 1 : register_int_counter!(
124 1 : "safekeeper_backup_errors_total",
125 1 : "Number of errors during backup"
126 1 : )
127 1 : .expect("Failed to register safekeeper_backup_errors_total counter")
128 1 : });
129 500 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
130 500 : register_histogram!(
131 500 : "safekeeper_broker_push_update_seconds",
132 500 : "Seconds to push all timeline updates to the broker",
133 500 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
134 500 : )
135 500 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
136 500 : });
137 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
138 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
139 : ];
140 500 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
141 500 : register_histogram!(
142 500 : "safekeeper_broker_iteration_timelines",
143 500 : "Count of timelines pushed to the broker in a single iteration",
144 500 : TIMELINES_COUNT_BUCKETS.to_vec()
145 500 : )
146 500 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
147 500 : });
148 :
149 : pub const LABEL_UNKNOWN: &str = "unknown";
150 :
151 : /// Labels for traffic metrics.
152 UBC 0 : #[derive(Clone)]
153 : struct ConnectionLabels {
154 : /// Availability zone of the connection origin.
155 : client_az: String,
156 : /// Availability zone of the current safekeeper.
157 : sk_az: String,
158 : /// Client application name.
159 : app_name: String,
160 : }
161 :
162 : impl ConnectionLabels {
163 CBC 3561 : fn new() -> Self {
164 3561 : Self {
165 3561 : client_az: LABEL_UNKNOWN.to_string(),
166 3561 : sk_az: LABEL_UNKNOWN.to_string(),
167 3561 : app_name: LABEL_UNKNOWN.to_string(),
168 3561 : }
169 3561 : }
170 :
171 7908 : fn build_metrics(
172 7908 : &self,
173 7908 : ) -> (
174 7908 : GenericCounter<metrics::core::AtomicU64>,
175 7908 : GenericCounter<metrics::core::AtomicU64>,
176 7908 : ) {
177 7908 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
178 7908 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
179 8 : (client_az, sk_az) => {
180 8 : if client_az == sk_az {
181 UBC 0 : "true"
182 : } else {
183 CBC 8 : "false"
184 : }
185 : }
186 : };
187 :
188 7908 : let read = PG_IO_BYTES.with_label_values(&[
189 7908 : &self.client_az,
190 7908 : &self.sk_az,
191 7908 : &self.app_name,
192 7908 : "read",
193 7908 : same_az,
194 7908 : ]);
195 7908 : let write = PG_IO_BYTES.with_label_values(&[
196 7908 : &self.client_az,
197 7908 : &self.sk_az,
198 7908 : &self.app_name,
199 7908 : "write",
200 7908 : same_az,
201 7908 : ]);
202 7908 : (read, write)
203 7908 : }
204 : }
205 :
206 : struct TrafficMetricsState {
207 : /// Labels for traffic metrics.
208 : labels: ConnectionLabels,
209 : /// Total bytes read from this connection.
210 : read: GenericCounter<metrics::core::AtomicU64>,
211 : /// Total bytes written to this connection.
212 : write: GenericCounter<metrics::core::AtomicU64>,
213 : }
214 :
215 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
216 3561 : #[derive(Clone)]
217 : pub struct TrafficMetrics {
218 : state: Arc<RwLock<TrafficMetricsState>>,
219 : }
220 :
221 : impl Default for TrafficMetrics {
222 UBC 0 : fn default() -> Self {
223 0 : Self::new()
224 0 : }
225 : }
226 :
227 : impl TrafficMetrics {
228 CBC 3561 : pub fn new() -> Self {
229 3561 : let labels = ConnectionLabels::new();
230 3561 : let (read, write) = labels.build_metrics();
231 3561 : let state = TrafficMetricsState {
232 3561 : labels,
233 3561 : read,
234 3561 : write,
235 3561 : };
236 3561 : Self {
237 3561 : state: Arc::new(RwLock::new(state)),
238 3561 : }
239 3561 : }
240 :
241 4 : pub fn set_client_az(&self, value: &str) {
242 4 : let mut state = self.state.write().unwrap();
243 4 : state.labels.client_az = value.to_string();
244 4 : (state.read, state.write) = state.labels.build_metrics();
245 4 : }
246 :
247 3561 : pub fn set_sk_az(&self, value: &str) {
248 3561 : let mut state = self.state.write().unwrap();
249 3561 : state.labels.sk_az = value.to_string();
250 3561 : (state.read, state.write) = state.labels.build_metrics();
251 3561 : }
252 :
253 782 : pub fn set_app_name(&self, value: &str) {
254 782 : let mut state = self.state.write().unwrap();
255 782 : state.labels.app_name = value.to_string();
256 782 : (state.read, state.write) = state.labels.build_metrics();
257 782 : }
258 :
259 3539877 : pub fn observe_read(&self, cnt: usize) {
260 3539877 : self.state.read().unwrap().read.inc_by(cnt as u64)
261 3539877 : }
262 :
263 3265120 : pub fn observe_write(&self, cnt: usize) {
264 3265120 : self.state.read().unwrap().write.inc_by(cnt as u64)
265 3265120 : }
266 : }
267 :
268 : /// Metrics for WalStorage in a single timeline.
269 566 : #[derive(Clone, Default)]
270 : pub struct WalStorageMetrics {
271 : /// How much bytes were written in total.
272 : write_wal_bytes: u64,
273 : /// How much time spent writing WAL to disk, waiting for write(2).
274 : write_wal_seconds: f64,
275 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
276 : flush_wal_seconds: f64,
277 : }
278 :
279 : impl WalStorageMetrics {
280 1597352 : pub fn observe_write_bytes(&mut self, bytes: usize) {
281 1597352 : self.write_wal_bytes += bytes as u64;
282 1597352 : WRITE_WAL_BYTES.observe(bytes as f64);
283 1597352 : }
284 :
285 1597352 : pub fn observe_write_seconds(&mut self, seconds: f64) {
286 1597352 : self.write_wal_seconds += seconds;
287 1597352 : WRITE_WAL_SECONDS.observe(seconds);
288 1597352 : }
289 :
290 UBC 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
291 0 : self.flush_wal_seconds += seconds;
292 0 : FLUSH_WAL_SECONDS.observe(seconds);
293 0 : }
294 : }
295 :
296 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
297 CBC 1597352 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
298 1597352 : closure: impl Future<Output = Result<(), E>>,
299 1597352 : ) -> Result<f64> {
300 1597352 : let start = std::time::Instant::now();
301 3131744 : closure.await.map_err(|e| e.into())?;
302 1597352 : Ok(start.elapsed().as_secs_f64())
303 1597352 : }
304 :
305 : /// Metrics for a single timeline.
306 UBC 0 : #[derive(Clone)]
307 : pub struct FullTimelineInfo {
308 : pub ttid: TenantTimelineId,
309 : pub ps_feedback: PageserverFeedback,
310 : pub wal_backup_active: bool,
311 : pub timeline_is_active: bool,
312 : pub num_computes: u32,
313 : pub last_removed_segno: XLogSegNo,
314 :
315 : pub epoch_start_lsn: Lsn,
316 : pub mem_state: SafekeeperMemState,
317 : pub persisted_state: SafeKeeperState,
318 :
319 : pub flush_lsn: Lsn,
320 : pub remote_consistent_lsn: Lsn,
321 :
322 : pub wal_storage: WalStorageMetrics,
323 : }
324 :
325 : /// Collects metrics for all active timelines.
326 : pub struct TimelineCollector {
327 : descs: Vec<Desc>,
328 : commit_lsn: GenericGaugeVec<AtomicU64>,
329 : backup_lsn: GenericGaugeVec<AtomicU64>,
330 : flush_lsn: GenericGaugeVec<AtomicU64>,
331 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
332 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
333 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
334 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
335 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
336 : timeline_active: GenericGaugeVec<AtomicU64>,
337 : wal_backup_active: GenericGaugeVec<AtomicU64>,
338 : connected_computes: IntGaugeVec,
339 : disk_usage: GenericGaugeVec<AtomicU64>,
340 : acceptor_term: GenericGaugeVec<AtomicU64>,
341 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
342 : written_wal_seconds: GaugeVec,
343 : flushed_wal_seconds: GaugeVec,
344 : collect_timeline_metrics: Gauge,
345 : timelines_count: IntGauge,
346 : }
347 :
348 : impl Default for TimelineCollector {
349 0 : fn default() -> Self {
350 0 : Self::new()
351 0 : }
352 : }
353 :
354 : impl TimelineCollector {
355 CBC 500 : pub fn new() -> TimelineCollector {
356 500 : let mut descs = Vec::new();
357 500 :
358 500 : let commit_lsn = GenericGaugeVec::new(
359 500 : Opts::new(
360 500 : "safekeeper_commit_lsn",
361 500 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
362 500 : ),
363 500 : &["tenant_id", "timeline_id"],
364 500 : )
365 500 : .unwrap();
366 500 : descs.extend(commit_lsn.desc().into_iter().cloned());
367 500 :
368 500 : let backup_lsn = GenericGaugeVec::new(
369 500 : Opts::new(
370 500 : "safekeeper_backup_lsn",
371 500 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
372 500 : ),
373 500 : &["tenant_id", "timeline_id"],
374 500 : )
375 500 : .unwrap();
376 500 : descs.extend(backup_lsn.desc().into_iter().cloned());
377 500 :
378 500 : let flush_lsn = GenericGaugeVec::new(
379 500 : Opts::new(
380 500 : "safekeeper_flush_lsn",
381 500 : "Current flush_lsn, grouped by timeline",
382 500 : ),
383 500 : &["tenant_id", "timeline_id"],
384 500 : )
385 500 : .unwrap();
386 500 : descs.extend(flush_lsn.desc().into_iter().cloned());
387 500 :
388 500 : let epoch_start_lsn = GenericGaugeVec::new(
389 500 : Opts::new(
390 500 : "safekeeper_epoch_start_lsn",
391 500 : "Point since which compute generates new WAL in the current consensus term",
392 500 : ),
393 500 : &["tenant_id", "timeline_id"],
394 500 : )
395 500 : .unwrap();
396 500 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
397 500 :
398 500 : let peer_horizon_lsn = GenericGaugeVec::new(
399 500 : Opts::new(
400 500 : "safekeeper_peer_horizon_lsn",
401 500 : "LSN of the most lagging safekeeper",
402 500 : ),
403 500 : &["tenant_id", "timeline_id"],
404 500 : )
405 500 : .unwrap();
406 500 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
407 500 :
408 500 : let remote_consistent_lsn = GenericGaugeVec::new(
409 500 : Opts::new(
410 500 : "safekeeper_remote_consistent_lsn",
411 500 : "LSN which is persisted to the remote storage in pageserver",
412 500 : ),
413 500 : &["tenant_id", "timeline_id"],
414 500 : )
415 500 : .unwrap();
416 500 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
417 500 :
418 500 : let ps_last_received_lsn = GenericGaugeVec::new(
419 500 : Opts::new(
420 500 : "safekeeper_ps_last_received_lsn",
421 500 : "Last LSN received by the pageserver, acknowledged in the feedback",
422 500 : ),
423 500 : &["tenant_id", "timeline_id"],
424 500 : )
425 500 : .unwrap();
426 500 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
427 500 :
428 500 : let feedback_last_time_seconds = GenericGaugeVec::new(
429 500 : Opts::new(
430 500 : "safekeeper_feedback_last_time_seconds",
431 500 : "Timestamp of the last feedback from the pageserver",
432 500 : ),
433 500 : &["tenant_id", "timeline_id"],
434 500 : )
435 500 : .unwrap();
436 500 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
437 500 :
438 500 : let timeline_active = GenericGaugeVec::new(
439 500 : Opts::new(
440 500 : "safekeeper_timeline_active",
441 500 : "Reports 1 for active timelines, 0 for inactive",
442 500 : ),
443 500 : &["tenant_id", "timeline_id"],
444 500 : )
445 500 : .unwrap();
446 500 : descs.extend(timeline_active.desc().into_iter().cloned());
447 500 :
448 500 : let wal_backup_active = GenericGaugeVec::new(
449 500 : Opts::new(
450 500 : "safekeeper_wal_backup_active",
451 500 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
452 500 : ),
453 500 : &["tenant_id", "timeline_id"],
454 500 : )
455 500 : .unwrap();
456 500 : descs.extend(wal_backup_active.desc().into_iter().cloned());
457 500 :
458 500 : let connected_computes = IntGaugeVec::new(
459 500 : Opts::new(
460 500 : "safekeeper_connected_computes",
461 500 : "Number of active compute connections",
462 500 : ),
463 500 : &["tenant_id", "timeline_id"],
464 500 : )
465 500 : .unwrap();
466 500 : descs.extend(connected_computes.desc().into_iter().cloned());
467 500 :
468 500 : let disk_usage = GenericGaugeVec::new(
469 500 : Opts::new(
470 500 : "safekeeper_disk_usage_bytes",
471 500 : "Estimated disk space used to store WAL segments",
472 500 : ),
473 500 : &["tenant_id", "timeline_id"],
474 500 : )
475 500 : .unwrap();
476 500 : descs.extend(disk_usage.desc().into_iter().cloned());
477 500 :
478 500 : let acceptor_term = GenericGaugeVec::new(
479 500 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
480 500 : &["tenant_id", "timeline_id"],
481 500 : )
482 500 : .unwrap();
483 500 : descs.extend(acceptor_term.desc().into_iter().cloned());
484 500 :
485 500 : let written_wal_bytes = GenericGaugeVec::new(
486 500 : Opts::new(
487 500 : "safekeeper_written_wal_bytes_total",
488 500 : "Number of WAL bytes written to disk, grouped by timeline",
489 500 : ),
490 500 : &["tenant_id", "timeline_id"],
491 500 : )
492 500 : .unwrap();
493 500 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
494 500 :
495 500 : let written_wal_seconds = GaugeVec::new(
496 500 : Opts::new(
497 500 : "safekeeper_written_wal_seconds_total",
498 500 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
499 500 : ),
500 500 : &["tenant_id", "timeline_id"],
501 500 : )
502 500 : .unwrap();
503 500 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
504 500 :
505 500 : let flushed_wal_seconds = GaugeVec::new(
506 500 : Opts::new(
507 500 : "safekeeper_flushed_wal_seconds_total",
508 500 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
509 500 : ),
510 500 : &["tenant_id", "timeline_id"],
511 500 : )
512 500 : .unwrap();
513 500 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
514 500 :
515 500 : let collect_timeline_metrics = Gauge::new(
516 500 : "safekeeper_collect_timeline_metrics_seconds",
517 500 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
518 500 : )
519 500 : .unwrap();
520 500 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
521 500 :
522 500 : let timelines_count = IntGauge::new(
523 500 : "safekeeper_timelines",
524 500 : "Total number of timelines loaded in-memory",
525 500 : )
526 500 : .unwrap();
527 500 : descs.extend(timelines_count.desc().into_iter().cloned());
528 500 :
529 500 : TimelineCollector {
530 500 : descs,
531 500 : commit_lsn,
532 500 : backup_lsn,
533 500 : flush_lsn,
534 500 : epoch_start_lsn,
535 500 : peer_horizon_lsn,
536 500 : remote_consistent_lsn,
537 500 : ps_last_received_lsn,
538 500 : feedback_last_time_seconds,
539 500 : timeline_active,
540 500 : wal_backup_active,
541 500 : connected_computes,
542 500 : disk_usage,
543 500 : acceptor_term,
544 500 : written_wal_bytes,
545 500 : written_wal_seconds,
546 500 : flushed_wal_seconds,
547 500 : collect_timeline_metrics,
548 500 : timelines_count,
549 500 : }
550 500 : }
551 : }
552 :
553 : impl Collector for TimelineCollector {
554 500 : fn desc(&self) -> Vec<&Desc> {
555 500 : self.descs.iter().collect()
556 500 : }
557 :
558 19 : fn collect(&self) -> Vec<MetricFamily> {
559 19 : let start_collecting = Instant::now();
560 19 :
561 19 : // reset all metrics to clean up inactive timelines
562 19 : self.commit_lsn.reset();
563 19 : self.backup_lsn.reset();
564 19 : self.flush_lsn.reset();
565 19 : self.epoch_start_lsn.reset();
566 19 : self.peer_horizon_lsn.reset();
567 19 : self.remote_consistent_lsn.reset();
568 19 : self.ps_last_received_lsn.reset();
569 19 : self.feedback_last_time_seconds.reset();
570 19 : self.timeline_active.reset();
571 19 : self.wal_backup_active.reset();
572 19 : self.connected_computes.reset();
573 19 : self.disk_usage.reset();
574 19 : self.acceptor_term.reset();
575 19 : self.written_wal_bytes.reset();
576 19 : self.written_wal_seconds.reset();
577 19 : self.flushed_wal_seconds.reset();
578 19 :
579 19 : let timelines = GlobalTimelines::get_all();
580 19 : let timelines_count = timelines.len();
581 19 :
582 19 : // Prometheus Collector is sync, and data is stored under async lock. To
583 19 : // bridge the gap with a crutch, collect data in spawned thread with
584 19 : // local tokio runtime.
585 19 : let infos = std::thread::spawn(|| {
586 19 : let rt = tokio::runtime::Builder::new_current_thread()
587 19 : .build()
588 19 : .expect("failed to create rt");
589 19 : rt.block_on(collect_timeline_metrics())
590 19 : })
591 19 : .join()
592 19 : .expect("collect_timeline_metrics thread panicked");
593 :
594 70 : for tli in &infos {
595 51 : let tenant_id = tli.ttid.tenant_id.to_string();
596 51 : let timeline_id = tli.ttid.timeline_id.to_string();
597 51 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
598 51 :
599 51 : self.commit_lsn
600 51 : .with_label_values(labels)
601 51 : .set(tli.mem_state.commit_lsn.into());
602 51 : self.backup_lsn
603 51 : .with_label_values(labels)
604 51 : .set(tli.mem_state.backup_lsn.into());
605 51 : self.flush_lsn
606 51 : .with_label_values(labels)
607 51 : .set(tli.flush_lsn.into());
608 51 : self.epoch_start_lsn
609 51 : .with_label_values(labels)
610 51 : .set(tli.epoch_start_lsn.into());
611 51 : self.peer_horizon_lsn
612 51 : .with_label_values(labels)
613 51 : .set(tli.mem_state.peer_horizon_lsn.into());
614 51 : self.remote_consistent_lsn
615 51 : .with_label_values(labels)
616 51 : .set(tli.remote_consistent_lsn.into());
617 51 : self.timeline_active
618 51 : .with_label_values(labels)
619 51 : .set(tli.timeline_is_active as u64);
620 51 : self.wal_backup_active
621 51 : .with_label_values(labels)
622 51 : .set(tli.wal_backup_active as u64);
623 51 : self.connected_computes
624 51 : .with_label_values(labels)
625 51 : .set(tli.num_computes as i64);
626 51 : self.acceptor_term
627 51 : .with_label_values(labels)
628 51 : .set(tli.persisted_state.acceptor_state.term);
629 51 : self.written_wal_bytes
630 51 : .with_label_values(labels)
631 51 : .set(tli.wal_storage.write_wal_bytes);
632 51 : self.written_wal_seconds
633 51 : .with_label_values(labels)
634 51 : .set(tli.wal_storage.write_wal_seconds);
635 51 : self.flushed_wal_seconds
636 51 : .with_label_values(labels)
637 51 : .set(tli.wal_storage.flush_wal_seconds);
638 51 :
639 51 : self.ps_last_received_lsn
640 51 : .with_label_values(labels)
641 51 : .set(tli.ps_feedback.last_received_lsn.0);
642 51 : if let Ok(unix_time) = tli
643 51 : .ps_feedback
644 51 : .replytime
645 51 : .duration_since(SystemTime::UNIX_EPOCH)
646 51 : {
647 51 : self.feedback_last_time_seconds
648 51 : .with_label_values(labels)
649 51 : .set(unix_time.as_secs());
650 51 : }
651 :
652 51 : if tli.last_removed_segno != 0 {
653 UBC 0 : let segno_count = tli
654 0 : .flush_lsn
655 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
656 0 : - tli.last_removed_segno;
657 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
658 0 : self.disk_usage
659 0 : .with_label_values(labels)
660 0 : .set(disk_usage_bytes);
661 CBC 51 : }
662 : }
663 :
664 : // collect MetricFamilys.
665 19 : let mut mfs = Vec::new();
666 19 : mfs.extend(self.commit_lsn.collect());
667 19 : mfs.extend(self.backup_lsn.collect());
668 19 : mfs.extend(self.flush_lsn.collect());
669 19 : mfs.extend(self.epoch_start_lsn.collect());
670 19 : mfs.extend(self.peer_horizon_lsn.collect());
671 19 : mfs.extend(self.remote_consistent_lsn.collect());
672 19 : mfs.extend(self.ps_last_received_lsn.collect());
673 19 : mfs.extend(self.feedback_last_time_seconds.collect());
674 19 : mfs.extend(self.timeline_active.collect());
675 19 : mfs.extend(self.wal_backup_active.collect());
676 19 : mfs.extend(self.connected_computes.collect());
677 19 : mfs.extend(self.disk_usage.collect());
678 19 : mfs.extend(self.acceptor_term.collect());
679 19 : mfs.extend(self.written_wal_bytes.collect());
680 19 : mfs.extend(self.written_wal_seconds.collect());
681 19 : mfs.extend(self.flushed_wal_seconds.collect());
682 19 :
683 19 : // report time it took to collect all info
684 19 : let elapsed = start_collecting.elapsed().as_secs_f64();
685 19 : self.collect_timeline_metrics.set(elapsed);
686 19 : mfs.extend(self.collect_timeline_metrics.collect());
687 19 :
688 19 : // report total number of timelines
689 19 : self.timelines_count.set(timelines_count as i64);
690 19 : mfs.extend(self.timelines_count.collect());
691 19 :
692 19 : mfs
693 19 : }
694 : }
695 :
696 19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
697 19 : let mut res = vec![];
698 19 : let timelines = GlobalTimelines::get_all();
699 :
700 70 : for tli in timelines {
701 51 : if let Some(info) = tli.info_for_metrics().await {
702 51 : res.push(info);
703 51 : }
704 : }
705 19 : res
706 19 : }
|