Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
15 : IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
16 : };
17 : use once_cell::sync::Lazy;
18 :
19 : use postgres_ffi::XLogSegNo;
20 : use utils::pageserver_feedback::PageserverFeedback;
21 : use utils::{id::TenantTimelineId, lsn::Lsn};
22 :
23 : use crate::{
24 : state::{TimelineMemState, TimelinePersistentState},
25 : GlobalTimelines,
26 : };
27 :
28 : // Global metrics across all timelines.
29 0 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
30 : register_histogram!(
31 : "safekeeper_write_wal_bytes",
32 : "Bytes written to WAL in a single request",
33 : vec![
34 : 1.0,
35 : 10.0,
36 : 100.0,
37 : 1024.0,
38 : 8192.0,
39 : 128.0 * 1024.0,
40 : 1024.0 * 1024.0,
41 : 10.0 * 1024.0 * 1024.0
42 : ]
43 : )
44 0 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
45 0 : });
46 0 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
47 : register_histogram!(
48 : "safekeeper_write_wal_seconds",
49 : "Seconds spent writing and syncing WAL to a disk in a single request",
50 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
51 : )
52 0 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
53 0 : });
54 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
55 : register_histogram!(
56 : "safekeeper_flush_wal_seconds",
57 : "Seconds spent syncing WAL to a disk",
58 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
59 : )
60 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
61 0 : });
62 4 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
63 : register_histogram!(
64 : "safekeeper_persist_control_file_seconds",
65 : "Seconds to persist and sync control file",
66 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
67 : )
68 4 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
69 4 : });
70 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
71 : register_int_counter_vec!(
72 : "safekeeper_pg_io_bytes_total",
73 : "Bytes read from or written to any PostgreSQL connection",
74 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
75 : )
76 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
77 0 : });
78 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
79 : register_int_counter!(
80 : "safekeeper_broker_pushed_updates_total",
81 : "Number of timeline updates pushed to the broker"
82 : )
83 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
84 0 : });
85 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
86 : register_int_counter_vec!(
87 : "safekeeper_broker_pulled_updates_total",
88 : "Number of timeline updates pulled and processed from the broker",
89 : &["result"]
90 : )
91 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
92 0 : });
93 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
94 : register_int_counter_pair_vec!(
95 : "safekeeper_pg_queries_received_total",
96 : "Number of queries received through pg protocol",
97 : "safekeeper_pg_queries_finished_total",
98 : "Number of queries finished through pg protocol",
99 : &["query"]
100 : )
101 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
102 0 : });
103 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
104 : register_int_counter!(
105 : "safekeeper_removed_wal_segments_total",
106 : "Number of WAL segments removed from the disk"
107 : )
108 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
109 0 : });
110 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
111 : register_int_counter!(
112 : "safekeeper_backed_up_segments_total",
113 : "Number of WAL segments backed up to the S3"
114 : )
115 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
116 0 : });
117 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
118 : register_int_counter!(
119 : "safekeeper_backup_errors_total",
120 : "Number of errors during backup"
121 : )
122 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
123 0 : });
124 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
125 : register_histogram!(
126 : "safekeeper_broker_push_update_seconds",
127 : "Seconds to push all timeline updates to the broker",
128 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
129 : )
130 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
131 0 : });
132 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
133 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
134 : ];
135 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
136 : register_histogram!(
137 : "safekeeper_broker_iteration_timelines",
138 : "Count of timelines pushed to the broker in a single iteration",
139 : TIMELINES_COUNT_BUCKETS.to_vec()
140 : )
141 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
142 0 : });
143 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
144 : register_int_counter!(
145 : "safekeeper_received_ps_feedbacks_total",
146 : "Number of pageserver feedbacks received"
147 : )
148 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
149 0 : });
150 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
151 : register_int_counter_vec!(
152 : "safekeeper_partial_backup_uploads_total",
153 : "Number of partial backup uploads to the S3",
154 : &["result"]
155 : )
156 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
157 0 : });
158 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
159 : register_int_counter!(
160 : "safekeeper_partial_backup_uploaded_bytes_total",
161 : "Number of bytes uploaded to the S3 during partial backup"
162 : )
163 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
164 0 : });
165 :
166 : pub const LABEL_UNKNOWN: &str = "unknown";
167 :
168 : /// Labels for traffic metrics.
169 : #[derive(Clone)]
170 : struct ConnectionLabels {
171 : /// Availability zone of the connection origin.
172 : client_az: String,
173 : /// Availability zone of the current safekeeper.
174 : sk_az: String,
175 : /// Client application name.
176 : app_name: String,
177 : }
178 :
179 : impl ConnectionLabels {
180 0 : fn new() -> Self {
181 0 : Self {
182 0 : client_az: LABEL_UNKNOWN.to_string(),
183 0 : sk_az: LABEL_UNKNOWN.to_string(),
184 0 : app_name: LABEL_UNKNOWN.to_string(),
185 0 : }
186 0 : }
187 :
188 0 : fn build_metrics(
189 0 : &self,
190 0 : ) -> (
191 0 : GenericCounter<metrics::core::AtomicU64>,
192 0 : GenericCounter<metrics::core::AtomicU64>,
193 0 : ) {
194 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
195 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
196 0 : (client_az, sk_az) => {
197 0 : if client_az == sk_az {
198 0 : "true"
199 : } else {
200 0 : "false"
201 : }
202 : }
203 : };
204 :
205 0 : let read = PG_IO_BYTES.with_label_values(&[
206 0 : &self.client_az,
207 0 : &self.sk_az,
208 0 : &self.app_name,
209 0 : "read",
210 0 : same_az,
211 0 : ]);
212 0 : let write = PG_IO_BYTES.with_label_values(&[
213 0 : &self.client_az,
214 0 : &self.sk_az,
215 0 : &self.app_name,
216 0 : "write",
217 0 : same_az,
218 0 : ]);
219 0 : (read, write)
220 0 : }
221 : }
222 :
223 : struct TrafficMetricsState {
224 : /// Labels for traffic metrics.
225 : labels: ConnectionLabels,
226 : /// Total bytes read from this connection.
227 : read: GenericCounter<metrics::core::AtomicU64>,
228 : /// Total bytes written to this connection.
229 : write: GenericCounter<metrics::core::AtomicU64>,
230 : }
231 :
232 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
233 : #[derive(Clone)]
234 : pub struct TrafficMetrics {
235 : state: Arc<RwLock<TrafficMetricsState>>,
236 : }
237 :
238 : impl Default for TrafficMetrics {
239 0 : fn default() -> Self {
240 0 : Self::new()
241 0 : }
242 : }
243 :
244 : impl TrafficMetrics {
245 0 : pub fn new() -> Self {
246 0 : let labels = ConnectionLabels::new();
247 0 : let (read, write) = labels.build_metrics();
248 0 : let state = TrafficMetricsState {
249 0 : labels,
250 0 : read,
251 0 : write,
252 0 : };
253 0 : Self {
254 0 : state: Arc::new(RwLock::new(state)),
255 0 : }
256 0 : }
257 :
258 0 : pub fn set_client_az(&self, value: &str) {
259 0 : let mut state = self.state.write().unwrap();
260 0 : state.labels.client_az = value.to_string();
261 0 : (state.read, state.write) = state.labels.build_metrics();
262 0 : }
263 :
264 0 : pub fn set_sk_az(&self, value: &str) {
265 0 : let mut state = self.state.write().unwrap();
266 0 : state.labels.sk_az = value.to_string();
267 0 : (state.read, state.write) = state.labels.build_metrics();
268 0 : }
269 :
270 0 : pub fn set_app_name(&self, value: &str) {
271 0 : let mut state = self.state.write().unwrap();
272 0 : state.labels.app_name = value.to_string();
273 0 : (state.read, state.write) = state.labels.build_metrics();
274 0 : }
275 :
276 0 : pub fn observe_read(&self, cnt: usize) {
277 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
278 0 : }
279 :
280 0 : pub fn observe_write(&self, cnt: usize) {
281 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
282 0 : }
283 : }
284 :
285 : /// Metrics for WalStorage in a single timeline.
286 : #[derive(Clone, Default)]
287 : pub struct WalStorageMetrics {
288 : /// How much bytes were written in total.
289 : write_wal_bytes: u64,
290 : /// How much time spent writing WAL to disk, waiting for write(2).
291 : write_wal_seconds: f64,
292 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
293 : flush_wal_seconds: f64,
294 : }
295 :
296 : impl WalStorageMetrics {
297 0 : pub fn observe_write_bytes(&mut self, bytes: usize) {
298 0 : self.write_wal_bytes += bytes as u64;
299 0 : WRITE_WAL_BYTES.observe(bytes as f64);
300 0 : }
301 :
302 0 : pub fn observe_write_seconds(&mut self, seconds: f64) {
303 0 : self.write_wal_seconds += seconds;
304 0 : WRITE_WAL_SECONDS.observe(seconds);
305 0 : }
306 :
307 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
308 0 : self.flush_wal_seconds += seconds;
309 0 : FLUSH_WAL_SECONDS.observe(seconds);
310 0 : }
311 : }
312 :
313 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
314 0 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
315 0 : closure: impl Future<Output = Result<(), E>>,
316 0 : ) -> Result<f64> {
317 0 : let start = std::time::Instant::now();
318 0 : closure.await.map_err(|e| e.into())?;
319 0 : Ok(start.elapsed().as_secs_f64())
320 0 : }
321 :
322 : /// Metrics for a single timeline.
323 : #[derive(Clone)]
324 : pub struct FullTimelineInfo {
325 : pub ttid: TenantTimelineId,
326 : pub ps_feedback_count: u64,
327 : pub last_ps_feedback: PageserverFeedback,
328 : pub wal_backup_active: bool,
329 : pub timeline_is_active: bool,
330 : pub num_computes: u32,
331 : pub last_removed_segno: XLogSegNo,
332 :
333 : pub epoch_start_lsn: Lsn,
334 : pub mem_state: TimelineMemState,
335 : pub persisted_state: TimelinePersistentState,
336 :
337 : pub flush_lsn: Lsn,
338 :
339 : pub wal_storage: WalStorageMetrics,
340 : }
341 :
342 : /// Collects metrics for all active timelines.
343 : pub struct TimelineCollector {
344 : descs: Vec<Desc>,
345 : commit_lsn: GenericGaugeVec<AtomicU64>,
346 : backup_lsn: GenericGaugeVec<AtomicU64>,
347 : flush_lsn: GenericGaugeVec<AtomicU64>,
348 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
349 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
350 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
351 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
352 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
353 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
354 : timeline_active: GenericGaugeVec<AtomicU64>,
355 : wal_backup_active: GenericGaugeVec<AtomicU64>,
356 : connected_computes: IntGaugeVec,
357 : disk_usage: GenericGaugeVec<AtomicU64>,
358 : acceptor_term: GenericGaugeVec<AtomicU64>,
359 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
360 : written_wal_seconds: GaugeVec,
361 : flushed_wal_seconds: GaugeVec,
362 : collect_timeline_metrics: Gauge,
363 : timelines_count: IntGauge,
364 : active_timelines_count: IntGauge,
365 : }
366 :
367 : impl Default for TimelineCollector {
368 0 : fn default() -> Self {
369 0 : Self::new()
370 0 : }
371 : }
372 :
373 : impl TimelineCollector {
374 0 : pub fn new() -> TimelineCollector {
375 0 : let mut descs = Vec::new();
376 0 :
377 0 : let commit_lsn = GenericGaugeVec::new(
378 0 : Opts::new(
379 0 : "safekeeper_commit_lsn",
380 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
381 0 : ),
382 0 : &["tenant_id", "timeline_id"],
383 0 : )
384 0 : .unwrap();
385 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
386 0 :
387 0 : let backup_lsn = GenericGaugeVec::new(
388 0 : Opts::new(
389 0 : "safekeeper_backup_lsn",
390 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
391 0 : ),
392 0 : &["tenant_id", "timeline_id"],
393 0 : )
394 0 : .unwrap();
395 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
396 0 :
397 0 : let flush_lsn = GenericGaugeVec::new(
398 0 : Opts::new(
399 0 : "safekeeper_flush_lsn",
400 0 : "Current flush_lsn, grouped by timeline",
401 0 : ),
402 0 : &["tenant_id", "timeline_id"],
403 0 : )
404 0 : .unwrap();
405 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
406 0 :
407 0 : let epoch_start_lsn = GenericGaugeVec::new(
408 0 : Opts::new(
409 0 : "safekeeper_epoch_start_lsn",
410 0 : "Point since which compute generates new WAL in the current consensus term",
411 0 : ),
412 0 : &["tenant_id", "timeline_id"],
413 0 : )
414 0 : .unwrap();
415 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
416 0 :
417 0 : let peer_horizon_lsn = GenericGaugeVec::new(
418 0 : Opts::new(
419 0 : "safekeeper_peer_horizon_lsn",
420 0 : "LSN of the most lagging safekeeper",
421 0 : ),
422 0 : &["tenant_id", "timeline_id"],
423 0 : )
424 0 : .unwrap();
425 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
426 0 :
427 0 : let remote_consistent_lsn = GenericGaugeVec::new(
428 0 : Opts::new(
429 0 : "safekeeper_remote_consistent_lsn",
430 0 : "LSN which is persisted to the remote storage in pageserver",
431 0 : ),
432 0 : &["tenant_id", "timeline_id"],
433 0 : )
434 0 : .unwrap();
435 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
436 0 :
437 0 : let ps_last_received_lsn = GenericGaugeVec::new(
438 0 : Opts::new(
439 0 : "safekeeper_ps_last_received_lsn",
440 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
441 0 : ),
442 0 : &["tenant_id", "timeline_id"],
443 0 : )
444 0 : .unwrap();
445 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
446 0 :
447 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
448 0 : Opts::new(
449 0 : "safekeeper_feedback_last_time_seconds",
450 0 : "Timestamp of the last feedback from the pageserver",
451 0 : ),
452 0 : &["tenant_id", "timeline_id"],
453 0 : )
454 0 : .unwrap();
455 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
456 0 :
457 0 : let ps_feedback_count = GenericGaugeVec::new(
458 0 : Opts::new(
459 0 : "safekeeper_ps_feedback_count_total",
460 0 : "Number of feedbacks received from the pageserver",
461 0 : ),
462 0 : &["tenant_id", "timeline_id"],
463 0 : )
464 0 : .unwrap();
465 0 :
466 0 : let timeline_active = GenericGaugeVec::new(
467 0 : Opts::new(
468 0 : "safekeeper_timeline_active",
469 0 : "Reports 1 for active timelines, 0 for inactive",
470 0 : ),
471 0 : &["tenant_id", "timeline_id"],
472 0 : )
473 0 : .unwrap();
474 0 : descs.extend(timeline_active.desc().into_iter().cloned());
475 0 :
476 0 : let wal_backup_active = GenericGaugeVec::new(
477 0 : Opts::new(
478 0 : "safekeeper_wal_backup_active",
479 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
480 0 : ),
481 0 : &["tenant_id", "timeline_id"],
482 0 : )
483 0 : .unwrap();
484 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
485 0 :
486 0 : let connected_computes = IntGaugeVec::new(
487 0 : Opts::new(
488 0 : "safekeeper_connected_computes",
489 0 : "Number of active compute connections",
490 0 : ),
491 0 : &["tenant_id", "timeline_id"],
492 0 : )
493 0 : .unwrap();
494 0 : descs.extend(connected_computes.desc().into_iter().cloned());
495 0 :
496 0 : let disk_usage = GenericGaugeVec::new(
497 0 : Opts::new(
498 0 : "safekeeper_disk_usage_bytes",
499 0 : "Estimated disk space used to store WAL segments",
500 0 : ),
501 0 : &["tenant_id", "timeline_id"],
502 0 : )
503 0 : .unwrap();
504 0 : descs.extend(disk_usage.desc().into_iter().cloned());
505 0 :
506 0 : let acceptor_term = GenericGaugeVec::new(
507 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
508 0 : &["tenant_id", "timeline_id"],
509 0 : )
510 0 : .unwrap();
511 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
512 0 :
513 0 : let written_wal_bytes = GenericGaugeVec::new(
514 0 : Opts::new(
515 0 : "safekeeper_written_wal_bytes_total",
516 0 : "Number of WAL bytes written to disk, grouped by timeline",
517 0 : ),
518 0 : &["tenant_id", "timeline_id"],
519 0 : )
520 0 : .unwrap();
521 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
522 0 :
523 0 : let written_wal_seconds = GaugeVec::new(
524 0 : Opts::new(
525 0 : "safekeeper_written_wal_seconds_total",
526 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
527 0 : ),
528 0 : &["tenant_id", "timeline_id"],
529 0 : )
530 0 : .unwrap();
531 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
532 0 :
533 0 : let flushed_wal_seconds = GaugeVec::new(
534 0 : Opts::new(
535 0 : "safekeeper_flushed_wal_seconds_total",
536 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
537 0 : ),
538 0 : &["tenant_id", "timeline_id"],
539 0 : )
540 0 : .unwrap();
541 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
542 0 :
543 0 : let collect_timeline_metrics = Gauge::new(
544 0 : "safekeeper_collect_timeline_metrics_seconds",
545 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
546 0 : )
547 0 : .unwrap();
548 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
549 0 :
550 0 : let timelines_count = IntGauge::new(
551 0 : "safekeeper_timelines",
552 0 : "Total number of timelines loaded in-memory",
553 0 : )
554 0 : .unwrap();
555 0 : descs.extend(timelines_count.desc().into_iter().cloned());
556 0 :
557 0 : let active_timelines_count = IntGauge::new(
558 0 : "safekeeper_active_timelines",
559 0 : "Total number of active timelines",
560 0 : )
561 0 : .unwrap();
562 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
563 0 :
564 0 : TimelineCollector {
565 0 : descs,
566 0 : commit_lsn,
567 0 : backup_lsn,
568 0 : flush_lsn,
569 0 : epoch_start_lsn,
570 0 : peer_horizon_lsn,
571 0 : remote_consistent_lsn,
572 0 : ps_last_received_lsn,
573 0 : feedback_last_time_seconds,
574 0 : ps_feedback_count,
575 0 : timeline_active,
576 0 : wal_backup_active,
577 0 : connected_computes,
578 0 : disk_usage,
579 0 : acceptor_term,
580 0 : written_wal_bytes,
581 0 : written_wal_seconds,
582 0 : flushed_wal_seconds,
583 0 : collect_timeline_metrics,
584 0 : timelines_count,
585 0 : active_timelines_count,
586 0 : }
587 0 : }
588 : }
589 :
590 : impl Collector for TimelineCollector {
591 0 : fn desc(&self) -> Vec<&Desc> {
592 0 : self.descs.iter().collect()
593 0 : }
594 :
595 0 : fn collect(&self) -> Vec<MetricFamily> {
596 0 : let start_collecting = Instant::now();
597 0 :
598 0 : // reset all metrics to clean up inactive timelines
599 0 : self.commit_lsn.reset();
600 0 : self.backup_lsn.reset();
601 0 : self.flush_lsn.reset();
602 0 : self.epoch_start_lsn.reset();
603 0 : self.peer_horizon_lsn.reset();
604 0 : self.remote_consistent_lsn.reset();
605 0 : self.ps_last_received_lsn.reset();
606 0 : self.feedback_last_time_seconds.reset();
607 0 : self.ps_feedback_count.reset();
608 0 : self.timeline_active.reset();
609 0 : self.wal_backup_active.reset();
610 0 : self.connected_computes.reset();
611 0 : self.disk_usage.reset();
612 0 : self.acceptor_term.reset();
613 0 : self.written_wal_bytes.reset();
614 0 : self.written_wal_seconds.reset();
615 0 : self.flushed_wal_seconds.reset();
616 0 :
617 0 : let timelines = GlobalTimelines::get_all();
618 0 : let timelines_count = timelines.len();
619 0 : let mut active_timelines_count = 0;
620 0 :
621 0 : // Prometheus Collector is sync, and data is stored under async lock. To
622 0 : // bridge the gap with a crutch, collect data in spawned thread with
623 0 : // local tokio runtime.
624 0 : let infos = std::thread::spawn(|| {
625 0 : let rt = tokio::runtime::Builder::new_current_thread()
626 0 : .build()
627 0 : .expect("failed to create rt");
628 0 : rt.block_on(collect_timeline_metrics())
629 0 : })
630 0 : .join()
631 0 : .expect("collect_timeline_metrics thread panicked");
632 :
633 0 : for tli in &infos {
634 0 : let tenant_id = tli.ttid.tenant_id.to_string();
635 0 : let timeline_id = tli.ttid.timeline_id.to_string();
636 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
637 0 :
638 0 : if tli.timeline_is_active {
639 0 : active_timelines_count += 1;
640 0 : }
641 :
642 0 : self.commit_lsn
643 0 : .with_label_values(labels)
644 0 : .set(tli.mem_state.commit_lsn.into());
645 0 : self.backup_lsn
646 0 : .with_label_values(labels)
647 0 : .set(tli.mem_state.backup_lsn.into());
648 0 : self.flush_lsn
649 0 : .with_label_values(labels)
650 0 : .set(tli.flush_lsn.into());
651 0 : self.epoch_start_lsn
652 0 : .with_label_values(labels)
653 0 : .set(tli.epoch_start_lsn.into());
654 0 : self.peer_horizon_lsn
655 0 : .with_label_values(labels)
656 0 : .set(tli.mem_state.peer_horizon_lsn.into());
657 0 : self.remote_consistent_lsn
658 0 : .with_label_values(labels)
659 0 : .set(tli.mem_state.remote_consistent_lsn.into());
660 0 : self.timeline_active
661 0 : .with_label_values(labels)
662 0 : .set(tli.timeline_is_active as u64);
663 0 : self.wal_backup_active
664 0 : .with_label_values(labels)
665 0 : .set(tli.wal_backup_active as u64);
666 0 : self.connected_computes
667 0 : .with_label_values(labels)
668 0 : .set(tli.num_computes as i64);
669 0 : self.acceptor_term
670 0 : .with_label_values(labels)
671 0 : .set(tli.persisted_state.acceptor_state.term);
672 0 : self.written_wal_bytes
673 0 : .with_label_values(labels)
674 0 : .set(tli.wal_storage.write_wal_bytes);
675 0 : self.written_wal_seconds
676 0 : .with_label_values(labels)
677 0 : .set(tli.wal_storage.write_wal_seconds);
678 0 : self.flushed_wal_seconds
679 0 : .with_label_values(labels)
680 0 : .set(tli.wal_storage.flush_wal_seconds);
681 0 :
682 0 : self.ps_last_received_lsn
683 0 : .with_label_values(labels)
684 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
685 0 : self.ps_feedback_count
686 0 : .with_label_values(labels)
687 0 : .set(tli.ps_feedback_count);
688 0 : if let Ok(unix_time) = tli
689 0 : .last_ps_feedback
690 0 : .replytime
691 0 : .duration_since(SystemTime::UNIX_EPOCH)
692 0 : {
693 0 : self.feedback_last_time_seconds
694 0 : .with_label_values(labels)
695 0 : .set(unix_time.as_secs());
696 0 : }
697 :
698 0 : if tli.last_removed_segno != 0 {
699 0 : let segno_count = tli
700 0 : .flush_lsn
701 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
702 0 : - tli.last_removed_segno;
703 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
704 0 : self.disk_usage
705 0 : .with_label_values(labels)
706 0 : .set(disk_usage_bytes);
707 0 : }
708 : }
709 :
710 : // collect MetricFamilys.
711 0 : let mut mfs = Vec::new();
712 0 : mfs.extend(self.commit_lsn.collect());
713 0 : mfs.extend(self.backup_lsn.collect());
714 0 : mfs.extend(self.flush_lsn.collect());
715 0 : mfs.extend(self.epoch_start_lsn.collect());
716 0 : mfs.extend(self.peer_horizon_lsn.collect());
717 0 : mfs.extend(self.remote_consistent_lsn.collect());
718 0 : mfs.extend(self.ps_last_received_lsn.collect());
719 0 : mfs.extend(self.feedback_last_time_seconds.collect());
720 0 : mfs.extend(self.ps_feedback_count.collect());
721 0 : mfs.extend(self.timeline_active.collect());
722 0 : mfs.extend(self.wal_backup_active.collect());
723 0 : mfs.extend(self.connected_computes.collect());
724 0 : mfs.extend(self.disk_usage.collect());
725 0 : mfs.extend(self.acceptor_term.collect());
726 0 : mfs.extend(self.written_wal_bytes.collect());
727 0 : mfs.extend(self.written_wal_seconds.collect());
728 0 : mfs.extend(self.flushed_wal_seconds.collect());
729 0 :
730 0 : // report time it took to collect all info
731 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
732 0 : self.collect_timeline_metrics.set(elapsed);
733 0 : mfs.extend(self.collect_timeline_metrics.collect());
734 0 :
735 0 : // report total number of timelines
736 0 : self.timelines_count.set(timelines_count as i64);
737 0 : mfs.extend(self.timelines_count.collect());
738 0 :
739 0 : self.active_timelines_count
740 0 : .set(active_timelines_count as i64);
741 0 : mfs.extend(self.active_timelines_count.collect());
742 0 :
743 0 : mfs
744 0 : }
745 : }
746 :
747 0 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
748 0 : let mut res = vec![];
749 0 : let timelines = GlobalTimelines::get_all();
750 :
751 0 : for tli in timelines {
752 0 : if let Some(info) = tli.info_for_metrics().await {
753 0 : res.push(info);
754 0 : }
755 : }
756 0 : res
757 0 : }
|