Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
15 : register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
16 : IntGaugeVec,
17 : };
18 : use once_cell::sync::Lazy;
19 :
20 : use postgres_ffi::XLogSegNo;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 : use utils::{id::TenantTimelineId, lsn::Lsn};
23 :
24 : use crate::{
25 : state::{TimelineMemState, TimelinePersistentState},
26 : GlobalTimelines,
27 : };
28 :
29 : // Global metrics across all timelines.
30 0 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
31 : register_histogram!(
32 : "safekeeper_write_wal_bytes",
33 : "Bytes written to WAL in a single request",
34 : vec![
35 : 1.0,
36 : 10.0,
37 : 100.0,
38 : 1024.0,
39 : 8192.0,
40 : 128.0 * 1024.0,
41 : 1024.0 * 1024.0,
42 : 10.0 * 1024.0 * 1024.0
43 : ]
44 : )
45 0 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
46 0 : });
47 0 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
48 : register_histogram!(
49 : "safekeeper_write_wal_seconds",
50 : "Seconds spent writing and syncing WAL to a disk in a single request",
51 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
52 : )
53 0 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
54 0 : });
55 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
56 : register_histogram!(
57 : "safekeeper_flush_wal_seconds",
58 : "Seconds spent syncing WAL to a disk",
59 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
60 : )
61 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
62 0 : });
63 4 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
64 : register_histogram!(
65 : "safekeeper_persist_control_file_seconds",
66 : "Seconds to persist and sync control file",
67 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
68 : )
69 4 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
70 4 : });
71 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
72 : register_int_counter_vec!(
73 : "safekeeper_pg_io_bytes_total",
74 : "Bytes read from or written to any PostgreSQL connection",
75 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
76 : )
77 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
78 0 : });
79 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
80 : register_int_counter!(
81 : "safekeeper_broker_pushed_updates_total",
82 : "Number of timeline updates pushed to the broker"
83 : )
84 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
85 0 : });
86 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
87 : register_int_counter_vec!(
88 : "safekeeper_broker_pulled_updates_total",
89 : "Number of timeline updates pulled and processed from the broker",
90 : &["result"]
91 : )
92 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
93 0 : });
94 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
95 : register_int_counter_pair_vec!(
96 : "safekeeper_pg_queries_received_total",
97 : "Number of queries received through pg protocol",
98 : "safekeeper_pg_queries_finished_total",
99 : "Number of queries finished through pg protocol",
100 : &["query"]
101 : )
102 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
103 0 : });
104 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
105 : register_int_counter!(
106 : "safekeeper_removed_wal_segments_total",
107 : "Number of WAL segments removed from the disk"
108 : )
109 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
110 0 : });
111 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
112 : register_int_counter!(
113 : "safekeeper_backed_up_segments_total",
114 : "Number of WAL segments backed up to the S3"
115 : )
116 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
117 0 : });
118 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
119 : register_int_counter!(
120 : "safekeeper_backup_errors_total",
121 : "Number of errors during backup"
122 : )
123 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
124 0 : });
125 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
126 : register_histogram!(
127 : "safekeeper_broker_push_update_seconds",
128 : "Seconds to push all timeline updates to the broker",
129 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
130 : )
131 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
132 0 : });
133 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
134 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
135 : ];
136 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
137 : register_histogram!(
138 : "safekeeper_broker_iteration_timelines",
139 : "Count of timelines pushed to the broker in a single iteration",
140 : TIMELINES_COUNT_BUCKETS.to_vec()
141 : )
142 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
143 0 : });
144 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
145 : register_int_counter!(
146 : "safekeeper_received_ps_feedbacks_total",
147 : "Number of pageserver feedbacks received"
148 : )
149 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
150 0 : });
151 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
152 : register_int_counter_vec!(
153 : "safekeeper_partial_backup_uploads_total",
154 : "Number of partial backup uploads to the S3",
155 : &["result"]
156 : )
157 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
158 0 : });
159 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
160 : register_int_counter!(
161 : "safekeeper_partial_backup_uploaded_bytes_total",
162 : "Number of bytes uploaded to the S3 during partial backup"
163 : )
164 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
165 0 : });
166 0 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
167 : register_int_counter!(
168 : "safekeeper_manager_iterations_total",
169 : "Number of iterations of the timeline manager task"
170 : )
171 0 : .expect("Failed to register safekeeper_manager_iterations_total counter")
172 0 : });
173 0 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
174 : register_int_counter!(
175 : "safekeeper_manager_active_changes_total",
176 : "Number of timeline active status changes in the timeline manager task"
177 : )
178 0 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
179 0 : });
180 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
181 : register_int_counter_pair!(
182 : "safekeeper_wal_backup_tasks_started_total",
183 : "Number of active WAL backup tasks",
184 : "safekeeper_wal_backup_tasks_finished_total",
185 : "Number of finished WAL backup tasks",
186 : )
187 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
188 0 : });
189 :
190 : pub const LABEL_UNKNOWN: &str = "unknown";
191 :
192 : /// Labels for traffic metrics.
193 : #[derive(Clone)]
194 : struct ConnectionLabels {
195 : /// Availability zone of the connection origin.
196 : client_az: String,
197 : /// Availability zone of the current safekeeper.
198 : sk_az: String,
199 : /// Client application name.
200 : app_name: String,
201 : }
202 :
203 : impl ConnectionLabels {
204 0 : fn new() -> Self {
205 0 : Self {
206 0 : client_az: LABEL_UNKNOWN.to_string(),
207 0 : sk_az: LABEL_UNKNOWN.to_string(),
208 0 : app_name: LABEL_UNKNOWN.to_string(),
209 0 : }
210 0 : }
211 :
212 0 : fn build_metrics(
213 0 : &self,
214 0 : ) -> (
215 0 : GenericCounter<metrics::core::AtomicU64>,
216 0 : GenericCounter<metrics::core::AtomicU64>,
217 0 : ) {
218 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
219 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
220 0 : (client_az, sk_az) => {
221 0 : if client_az == sk_az {
222 0 : "true"
223 : } else {
224 0 : "false"
225 : }
226 : }
227 : };
228 :
229 0 : let read = PG_IO_BYTES.with_label_values(&[
230 0 : &self.client_az,
231 0 : &self.sk_az,
232 0 : &self.app_name,
233 0 : "read",
234 0 : same_az,
235 0 : ]);
236 0 : let write = PG_IO_BYTES.with_label_values(&[
237 0 : &self.client_az,
238 0 : &self.sk_az,
239 0 : &self.app_name,
240 0 : "write",
241 0 : same_az,
242 0 : ]);
243 0 : (read, write)
244 0 : }
245 : }
246 :
247 : struct TrafficMetricsState {
248 : /// Labels for traffic metrics.
249 : labels: ConnectionLabels,
250 : /// Total bytes read from this connection.
251 : read: GenericCounter<metrics::core::AtomicU64>,
252 : /// Total bytes written to this connection.
253 : write: GenericCounter<metrics::core::AtomicU64>,
254 : }
255 :
256 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
257 : #[derive(Clone)]
258 : pub struct TrafficMetrics {
259 : state: Arc<RwLock<TrafficMetricsState>>,
260 : }
261 :
262 : impl Default for TrafficMetrics {
263 0 : fn default() -> Self {
264 0 : Self::new()
265 0 : }
266 : }
267 :
268 : impl TrafficMetrics {
269 0 : pub fn new() -> Self {
270 0 : let labels = ConnectionLabels::new();
271 0 : let (read, write) = labels.build_metrics();
272 0 : let state = TrafficMetricsState {
273 0 : labels,
274 0 : read,
275 0 : write,
276 0 : };
277 0 : Self {
278 0 : state: Arc::new(RwLock::new(state)),
279 0 : }
280 0 : }
281 :
282 0 : pub fn set_client_az(&self, value: &str) {
283 0 : let mut state = self.state.write().unwrap();
284 0 : state.labels.client_az = value.to_string();
285 0 : (state.read, state.write) = state.labels.build_metrics();
286 0 : }
287 :
288 0 : pub fn set_sk_az(&self, value: &str) {
289 0 : let mut state = self.state.write().unwrap();
290 0 : state.labels.sk_az = value.to_string();
291 0 : (state.read, state.write) = state.labels.build_metrics();
292 0 : }
293 :
294 0 : pub fn set_app_name(&self, value: &str) {
295 0 : let mut state = self.state.write().unwrap();
296 0 : state.labels.app_name = value.to_string();
297 0 : (state.read, state.write) = state.labels.build_metrics();
298 0 : }
299 :
300 0 : pub fn observe_read(&self, cnt: usize) {
301 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
302 0 : }
303 :
304 0 : pub fn observe_write(&self, cnt: usize) {
305 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
306 0 : }
307 : }
308 :
309 : /// Metrics for WalStorage in a single timeline.
310 : #[derive(Clone, Default)]
311 : pub struct WalStorageMetrics {
312 : /// How much bytes were written in total.
313 : write_wal_bytes: u64,
314 : /// How much time spent writing WAL to disk, waiting for write(2).
315 : write_wal_seconds: f64,
316 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
317 : flush_wal_seconds: f64,
318 : }
319 :
320 : impl WalStorageMetrics {
321 0 : pub fn observe_write_bytes(&mut self, bytes: usize) {
322 0 : self.write_wal_bytes += bytes as u64;
323 0 : WRITE_WAL_BYTES.observe(bytes as f64);
324 0 : }
325 :
326 0 : pub fn observe_write_seconds(&mut self, seconds: f64) {
327 0 : self.write_wal_seconds += seconds;
328 0 : WRITE_WAL_SECONDS.observe(seconds);
329 0 : }
330 :
331 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
332 0 : self.flush_wal_seconds += seconds;
333 0 : FLUSH_WAL_SECONDS.observe(seconds);
334 0 : }
335 : }
336 :
337 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
338 0 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
339 0 : closure: impl Future<Output = Result<(), E>>,
340 0 : ) -> Result<f64> {
341 0 : let start = std::time::Instant::now();
342 0 : closure.await.map_err(|e| e.into())?;
343 0 : Ok(start.elapsed().as_secs_f64())
344 0 : }
345 :
346 : /// Metrics for a single timeline.
347 : #[derive(Clone)]
348 : pub struct FullTimelineInfo {
349 : pub ttid: TenantTimelineId,
350 : pub ps_feedback_count: u64,
351 : pub last_ps_feedback: PageserverFeedback,
352 : pub wal_backup_active: bool,
353 : pub timeline_is_active: bool,
354 : pub num_computes: u32,
355 : pub last_removed_segno: XLogSegNo,
356 :
357 : pub epoch_start_lsn: Lsn,
358 : pub mem_state: TimelineMemState,
359 : pub persisted_state: TimelinePersistentState,
360 :
361 : pub flush_lsn: Lsn,
362 :
363 : pub wal_storage: WalStorageMetrics,
364 : }
365 :
366 : /// Collects metrics for all active timelines.
367 : pub struct TimelineCollector {
368 : descs: Vec<Desc>,
369 : commit_lsn: GenericGaugeVec<AtomicU64>,
370 : backup_lsn: GenericGaugeVec<AtomicU64>,
371 : flush_lsn: GenericGaugeVec<AtomicU64>,
372 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
373 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
374 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
375 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
376 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
377 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
378 : timeline_active: GenericGaugeVec<AtomicU64>,
379 : wal_backup_active: GenericGaugeVec<AtomicU64>,
380 : connected_computes: IntGaugeVec,
381 : disk_usage: GenericGaugeVec<AtomicU64>,
382 : acceptor_term: GenericGaugeVec<AtomicU64>,
383 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
384 : written_wal_seconds: GaugeVec,
385 : flushed_wal_seconds: GaugeVec,
386 : collect_timeline_metrics: Gauge,
387 : timelines_count: IntGauge,
388 : active_timelines_count: IntGauge,
389 : }
390 :
391 : impl Default for TimelineCollector {
392 0 : fn default() -> Self {
393 0 : Self::new()
394 0 : }
395 : }
396 :
397 : impl TimelineCollector {
398 0 : pub fn new() -> TimelineCollector {
399 0 : let mut descs = Vec::new();
400 0 :
401 0 : let commit_lsn = GenericGaugeVec::new(
402 0 : Opts::new(
403 0 : "safekeeper_commit_lsn",
404 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
405 0 : ),
406 0 : &["tenant_id", "timeline_id"],
407 0 : )
408 0 : .unwrap();
409 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
410 0 :
411 0 : let backup_lsn = GenericGaugeVec::new(
412 0 : Opts::new(
413 0 : "safekeeper_backup_lsn",
414 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
415 0 : ),
416 0 : &["tenant_id", "timeline_id"],
417 0 : )
418 0 : .unwrap();
419 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
420 0 :
421 0 : let flush_lsn = GenericGaugeVec::new(
422 0 : Opts::new(
423 0 : "safekeeper_flush_lsn",
424 0 : "Current flush_lsn, grouped by timeline",
425 0 : ),
426 0 : &["tenant_id", "timeline_id"],
427 0 : )
428 0 : .unwrap();
429 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
430 0 :
431 0 : let epoch_start_lsn = GenericGaugeVec::new(
432 0 : Opts::new(
433 0 : "safekeeper_epoch_start_lsn",
434 0 : "Point since which compute generates new WAL in the current consensus term",
435 0 : ),
436 0 : &["tenant_id", "timeline_id"],
437 0 : )
438 0 : .unwrap();
439 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
440 0 :
441 0 : let peer_horizon_lsn = GenericGaugeVec::new(
442 0 : Opts::new(
443 0 : "safekeeper_peer_horizon_lsn",
444 0 : "LSN of the most lagging safekeeper",
445 0 : ),
446 0 : &["tenant_id", "timeline_id"],
447 0 : )
448 0 : .unwrap();
449 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
450 0 :
451 0 : let remote_consistent_lsn = GenericGaugeVec::new(
452 0 : Opts::new(
453 0 : "safekeeper_remote_consistent_lsn",
454 0 : "LSN which is persisted to the remote storage in pageserver",
455 0 : ),
456 0 : &["tenant_id", "timeline_id"],
457 0 : )
458 0 : .unwrap();
459 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
460 0 :
461 0 : let ps_last_received_lsn = GenericGaugeVec::new(
462 0 : Opts::new(
463 0 : "safekeeper_ps_last_received_lsn",
464 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
465 0 : ),
466 0 : &["tenant_id", "timeline_id"],
467 0 : )
468 0 : .unwrap();
469 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
470 0 :
471 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
472 0 : Opts::new(
473 0 : "safekeeper_feedback_last_time_seconds",
474 0 : "Timestamp of the last feedback from the pageserver",
475 0 : ),
476 0 : &["tenant_id", "timeline_id"],
477 0 : )
478 0 : .unwrap();
479 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
480 0 :
481 0 : let ps_feedback_count = GenericGaugeVec::new(
482 0 : Opts::new(
483 0 : "safekeeper_ps_feedback_count_total",
484 0 : "Number of feedbacks received from the pageserver",
485 0 : ),
486 0 : &["tenant_id", "timeline_id"],
487 0 : )
488 0 : .unwrap();
489 0 :
490 0 : let timeline_active = GenericGaugeVec::new(
491 0 : Opts::new(
492 0 : "safekeeper_timeline_active",
493 0 : "Reports 1 for active timelines, 0 for inactive",
494 0 : ),
495 0 : &["tenant_id", "timeline_id"],
496 0 : )
497 0 : .unwrap();
498 0 : descs.extend(timeline_active.desc().into_iter().cloned());
499 0 :
500 0 : let wal_backup_active = GenericGaugeVec::new(
501 0 : Opts::new(
502 0 : "safekeeper_wal_backup_active",
503 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
504 0 : ),
505 0 : &["tenant_id", "timeline_id"],
506 0 : )
507 0 : .unwrap();
508 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
509 0 :
510 0 : let connected_computes = IntGaugeVec::new(
511 0 : Opts::new(
512 0 : "safekeeper_connected_computes",
513 0 : "Number of active compute connections",
514 0 : ),
515 0 : &["tenant_id", "timeline_id"],
516 0 : )
517 0 : .unwrap();
518 0 : descs.extend(connected_computes.desc().into_iter().cloned());
519 0 :
520 0 : let disk_usage = GenericGaugeVec::new(
521 0 : Opts::new(
522 0 : "safekeeper_disk_usage_bytes",
523 0 : "Estimated disk space used to store WAL segments",
524 0 : ),
525 0 : &["tenant_id", "timeline_id"],
526 0 : )
527 0 : .unwrap();
528 0 : descs.extend(disk_usage.desc().into_iter().cloned());
529 0 :
530 0 : let acceptor_term = GenericGaugeVec::new(
531 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
532 0 : &["tenant_id", "timeline_id"],
533 0 : )
534 0 : .unwrap();
535 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
536 0 :
537 0 : let written_wal_bytes = GenericGaugeVec::new(
538 0 : Opts::new(
539 0 : "safekeeper_written_wal_bytes_total",
540 0 : "Number of WAL bytes written to disk, grouped by timeline",
541 0 : ),
542 0 : &["tenant_id", "timeline_id"],
543 0 : )
544 0 : .unwrap();
545 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
546 0 :
547 0 : let written_wal_seconds = GaugeVec::new(
548 0 : Opts::new(
549 0 : "safekeeper_written_wal_seconds_total",
550 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
551 0 : ),
552 0 : &["tenant_id", "timeline_id"],
553 0 : )
554 0 : .unwrap();
555 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
556 0 :
557 0 : let flushed_wal_seconds = GaugeVec::new(
558 0 : Opts::new(
559 0 : "safekeeper_flushed_wal_seconds_total",
560 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
561 0 : ),
562 0 : &["tenant_id", "timeline_id"],
563 0 : )
564 0 : .unwrap();
565 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
566 0 :
567 0 : let collect_timeline_metrics = Gauge::new(
568 0 : "safekeeper_collect_timeline_metrics_seconds",
569 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
570 0 : )
571 0 : .unwrap();
572 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
573 0 :
574 0 : let timelines_count = IntGauge::new(
575 0 : "safekeeper_timelines",
576 0 : "Total number of timelines loaded in-memory",
577 0 : )
578 0 : .unwrap();
579 0 : descs.extend(timelines_count.desc().into_iter().cloned());
580 0 :
581 0 : let active_timelines_count = IntGauge::new(
582 0 : "safekeeper_active_timelines",
583 0 : "Total number of active timelines",
584 0 : )
585 0 : .unwrap();
586 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
587 0 :
588 0 : TimelineCollector {
589 0 : descs,
590 0 : commit_lsn,
591 0 : backup_lsn,
592 0 : flush_lsn,
593 0 : epoch_start_lsn,
594 0 : peer_horizon_lsn,
595 0 : remote_consistent_lsn,
596 0 : ps_last_received_lsn,
597 0 : feedback_last_time_seconds,
598 0 : ps_feedback_count,
599 0 : timeline_active,
600 0 : wal_backup_active,
601 0 : connected_computes,
602 0 : disk_usage,
603 0 : acceptor_term,
604 0 : written_wal_bytes,
605 0 : written_wal_seconds,
606 0 : flushed_wal_seconds,
607 0 : collect_timeline_metrics,
608 0 : timelines_count,
609 0 : active_timelines_count,
610 0 : }
611 0 : }
612 : }
613 :
614 : impl Collector for TimelineCollector {
615 0 : fn desc(&self) -> Vec<&Desc> {
616 0 : self.descs.iter().collect()
617 0 : }
618 :
619 0 : fn collect(&self) -> Vec<MetricFamily> {
620 0 : let start_collecting = Instant::now();
621 0 :
622 0 : // reset all metrics to clean up inactive timelines
623 0 : self.commit_lsn.reset();
624 0 : self.backup_lsn.reset();
625 0 : self.flush_lsn.reset();
626 0 : self.epoch_start_lsn.reset();
627 0 : self.peer_horizon_lsn.reset();
628 0 : self.remote_consistent_lsn.reset();
629 0 : self.ps_last_received_lsn.reset();
630 0 : self.feedback_last_time_seconds.reset();
631 0 : self.ps_feedback_count.reset();
632 0 : self.timeline_active.reset();
633 0 : self.wal_backup_active.reset();
634 0 : self.connected_computes.reset();
635 0 : self.disk_usage.reset();
636 0 : self.acceptor_term.reset();
637 0 : self.written_wal_bytes.reset();
638 0 : self.written_wal_seconds.reset();
639 0 : self.flushed_wal_seconds.reset();
640 0 :
641 0 : let timelines_count = GlobalTimelines::get_all().len();
642 0 : let mut active_timelines_count = 0;
643 0 :
644 0 : // Prometheus Collector is sync, and data is stored under async lock. To
645 0 : // bridge the gap with a crutch, collect data in spawned thread with
646 0 : // local tokio runtime.
647 0 : let infos = std::thread::spawn(|| {
648 0 : let rt = tokio::runtime::Builder::new_current_thread()
649 0 : .build()
650 0 : .expect("failed to create rt");
651 0 : rt.block_on(collect_timeline_metrics())
652 0 : })
653 0 : .join()
654 0 : .expect("collect_timeline_metrics thread panicked");
655 :
656 0 : for tli in &infos {
657 0 : let tenant_id = tli.ttid.tenant_id.to_string();
658 0 : let timeline_id = tli.ttid.timeline_id.to_string();
659 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
660 0 :
661 0 : if tli.timeline_is_active {
662 0 : active_timelines_count += 1;
663 0 : }
664 :
665 0 : self.commit_lsn
666 0 : .with_label_values(labels)
667 0 : .set(tli.mem_state.commit_lsn.into());
668 0 : self.backup_lsn
669 0 : .with_label_values(labels)
670 0 : .set(tli.mem_state.backup_lsn.into());
671 0 : self.flush_lsn
672 0 : .with_label_values(labels)
673 0 : .set(tli.flush_lsn.into());
674 0 : self.epoch_start_lsn
675 0 : .with_label_values(labels)
676 0 : .set(tli.epoch_start_lsn.into());
677 0 : self.peer_horizon_lsn
678 0 : .with_label_values(labels)
679 0 : .set(tli.mem_state.peer_horizon_lsn.into());
680 0 : self.remote_consistent_lsn
681 0 : .with_label_values(labels)
682 0 : .set(tli.mem_state.remote_consistent_lsn.into());
683 0 : self.timeline_active
684 0 : .with_label_values(labels)
685 0 : .set(tli.timeline_is_active as u64);
686 0 : self.wal_backup_active
687 0 : .with_label_values(labels)
688 0 : .set(tli.wal_backup_active as u64);
689 0 : self.connected_computes
690 0 : .with_label_values(labels)
691 0 : .set(tli.num_computes as i64);
692 0 : self.acceptor_term
693 0 : .with_label_values(labels)
694 0 : .set(tli.persisted_state.acceptor_state.term);
695 0 : self.written_wal_bytes
696 0 : .with_label_values(labels)
697 0 : .set(tli.wal_storage.write_wal_bytes);
698 0 : self.written_wal_seconds
699 0 : .with_label_values(labels)
700 0 : .set(tli.wal_storage.write_wal_seconds);
701 0 : self.flushed_wal_seconds
702 0 : .with_label_values(labels)
703 0 : .set(tli.wal_storage.flush_wal_seconds);
704 0 :
705 0 : self.ps_last_received_lsn
706 0 : .with_label_values(labels)
707 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
708 0 : self.ps_feedback_count
709 0 : .with_label_values(labels)
710 0 : .set(tli.ps_feedback_count);
711 0 : if let Ok(unix_time) = tli
712 0 : .last_ps_feedback
713 0 : .replytime
714 0 : .duration_since(SystemTime::UNIX_EPOCH)
715 0 : {
716 0 : self.feedback_last_time_seconds
717 0 : .with_label_values(labels)
718 0 : .set(unix_time.as_secs());
719 0 : }
720 :
721 0 : if tli.last_removed_segno != 0 {
722 0 : let segno_count = tli
723 0 : .flush_lsn
724 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
725 0 : - tli.last_removed_segno;
726 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
727 0 : self.disk_usage
728 0 : .with_label_values(labels)
729 0 : .set(disk_usage_bytes);
730 0 : }
731 : }
732 :
733 : // collect MetricFamilys.
734 0 : let mut mfs = Vec::new();
735 0 : mfs.extend(self.commit_lsn.collect());
736 0 : mfs.extend(self.backup_lsn.collect());
737 0 : mfs.extend(self.flush_lsn.collect());
738 0 : mfs.extend(self.epoch_start_lsn.collect());
739 0 : mfs.extend(self.peer_horizon_lsn.collect());
740 0 : mfs.extend(self.remote_consistent_lsn.collect());
741 0 : mfs.extend(self.ps_last_received_lsn.collect());
742 0 : mfs.extend(self.feedback_last_time_seconds.collect());
743 0 : mfs.extend(self.ps_feedback_count.collect());
744 0 : mfs.extend(self.timeline_active.collect());
745 0 : mfs.extend(self.wal_backup_active.collect());
746 0 : mfs.extend(self.connected_computes.collect());
747 0 : mfs.extend(self.disk_usage.collect());
748 0 : mfs.extend(self.acceptor_term.collect());
749 0 : mfs.extend(self.written_wal_bytes.collect());
750 0 : mfs.extend(self.written_wal_seconds.collect());
751 0 : mfs.extend(self.flushed_wal_seconds.collect());
752 0 :
753 0 : // report time it took to collect all info
754 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
755 0 : self.collect_timeline_metrics.set(elapsed);
756 0 : mfs.extend(self.collect_timeline_metrics.collect());
757 0 :
758 0 : // report total number of timelines
759 0 : self.timelines_count.set(timelines_count as i64);
760 0 : mfs.extend(self.timelines_count.collect());
761 0 :
762 0 : self.active_timelines_count
763 0 : .set(active_timelines_count as i64);
764 0 : mfs.extend(self.active_timelines_count.collect());
765 0 :
766 0 : mfs
767 0 : }
768 : }
769 :
770 0 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
771 0 : let mut res = vec![];
772 0 : let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
773 :
774 0 : for tli in active_timelines {
775 0 : if let Some(info) = tli.info_for_metrics().await {
776 0 : res.push(info);
777 0 : }
778 : }
779 0 : res
780 0 : }
|