Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
15 : IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
16 : };
17 : use once_cell::sync::Lazy;
18 :
19 : use postgres_ffi::XLogSegNo;
20 : use utils::pageserver_feedback::PageserverFeedback;
21 : use utils::{id::TenantTimelineId, lsn::Lsn};
22 :
23 : use crate::{
24 : state::{TimelineMemState, TimelinePersistentState},
25 : GlobalTimelines,
26 : };
27 :
28 : // Global metrics across all timelines.
29 0 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
30 0 : register_histogram!(
31 0 : "safekeeper_write_wal_bytes",
32 0 : "Bytes written to WAL in a single request",
33 0 : vec![
34 0 : 1.0,
35 0 : 10.0,
36 0 : 100.0,
37 0 : 1024.0,
38 0 : 8192.0,
39 0 : 128.0 * 1024.0,
40 0 : 1024.0 * 1024.0,
41 0 : 10.0 * 1024.0 * 1024.0
42 0 : ]
43 0 : )
44 0 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
45 0 : });
46 0 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
47 0 : register_histogram!(
48 0 : "safekeeper_write_wal_seconds",
49 0 : "Seconds spent writing and syncing WAL to a disk in a single request",
50 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
51 0 : )
52 0 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
53 0 : });
54 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
55 0 : register_histogram!(
56 0 : "safekeeper_flush_wal_seconds",
57 0 : "Seconds spent syncing WAL to a disk",
58 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
59 0 : )
60 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
61 0 : });
62 4 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
63 4 : register_histogram!(
64 4 : "safekeeper_persist_control_file_seconds",
65 4 : "Seconds to persist and sync control file",
66 4 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
67 4 : )
68 4 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
69 4 : });
70 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
71 0 : register_int_counter_vec!(
72 0 : "safekeeper_pg_io_bytes_total",
73 0 : "Bytes read from or written to any PostgreSQL connection",
74 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
75 0 : )
76 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
77 0 : });
78 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
79 0 : register_int_counter!(
80 0 : "safekeeper_broker_pushed_updates_total",
81 0 : "Number of timeline updates pushed to the broker"
82 0 : )
83 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
84 0 : });
85 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
86 0 : register_int_counter_vec!(
87 0 : "safekeeper_broker_pulled_updates_total",
88 0 : "Number of timeline updates pulled and processed from the broker",
89 0 : &["result"]
90 0 : )
91 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
92 0 : });
93 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
94 0 : register_int_counter_pair_vec!(
95 0 : "safekeeper_pg_queries_received_total",
96 0 : "Number of queries received through pg protocol",
97 0 : "safekeeper_pg_queries_finished_total",
98 0 : "Number of queries finished through pg protocol",
99 0 : &["query"]
100 0 : )
101 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
102 0 : });
103 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
104 0 : register_int_counter!(
105 0 : "safekeeper_removed_wal_segments_total",
106 0 : "Number of WAL segments removed from the disk"
107 0 : )
108 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
109 0 : });
110 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
111 0 : register_int_counter!(
112 0 : "safekeeper_backed_up_segments_total",
113 0 : "Number of WAL segments backed up to the S3"
114 0 : )
115 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
116 0 : });
117 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
118 0 : register_int_counter!(
119 0 : "safekeeper_backup_errors_total",
120 0 : "Number of errors during backup"
121 0 : )
122 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
123 0 : });
124 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
125 0 : register_histogram!(
126 0 : "safekeeper_broker_push_update_seconds",
127 0 : "Seconds to push all timeline updates to the broker",
128 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
129 0 : )
130 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
131 0 : });
132 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
133 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
134 : ];
135 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
136 0 : register_histogram!(
137 0 : "safekeeper_broker_iteration_timelines",
138 0 : "Count of timelines pushed to the broker in a single iteration",
139 0 : TIMELINES_COUNT_BUCKETS.to_vec()
140 0 : )
141 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
142 0 : });
143 :
144 : pub const LABEL_UNKNOWN: &str = "unknown";
145 :
146 : /// Labels for traffic metrics.
147 0 : #[derive(Clone)]
148 : struct ConnectionLabels {
149 : /// Availability zone of the connection origin.
150 : client_az: String,
151 : /// Availability zone of the current safekeeper.
152 : sk_az: String,
153 : /// Client application name.
154 : app_name: String,
155 : }
156 :
157 : impl ConnectionLabels {
158 0 : fn new() -> Self {
159 0 : Self {
160 0 : client_az: LABEL_UNKNOWN.to_string(),
161 0 : sk_az: LABEL_UNKNOWN.to_string(),
162 0 : app_name: LABEL_UNKNOWN.to_string(),
163 0 : }
164 0 : }
165 :
166 0 : fn build_metrics(
167 0 : &self,
168 0 : ) -> (
169 0 : GenericCounter<metrics::core::AtomicU64>,
170 0 : GenericCounter<metrics::core::AtomicU64>,
171 0 : ) {
172 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
173 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
174 0 : (client_az, sk_az) => {
175 0 : if client_az == sk_az {
176 0 : "true"
177 : } else {
178 0 : "false"
179 : }
180 : }
181 : };
182 :
183 0 : let read = PG_IO_BYTES.with_label_values(&[
184 0 : &self.client_az,
185 0 : &self.sk_az,
186 0 : &self.app_name,
187 0 : "read",
188 0 : same_az,
189 0 : ]);
190 0 : let write = PG_IO_BYTES.with_label_values(&[
191 0 : &self.client_az,
192 0 : &self.sk_az,
193 0 : &self.app_name,
194 0 : "write",
195 0 : same_az,
196 0 : ]);
197 0 : (read, write)
198 0 : }
199 : }
200 :
201 : struct TrafficMetricsState {
202 : /// Labels for traffic metrics.
203 : labels: ConnectionLabels,
204 : /// Total bytes read from this connection.
205 : read: GenericCounter<metrics::core::AtomicU64>,
206 : /// Total bytes written to this connection.
207 : write: GenericCounter<metrics::core::AtomicU64>,
208 : }
209 :
210 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
211 0 : #[derive(Clone)]
212 : pub struct TrafficMetrics {
213 : state: Arc<RwLock<TrafficMetricsState>>,
214 : }
215 :
216 : impl Default for TrafficMetrics {
217 0 : fn default() -> Self {
218 0 : Self::new()
219 0 : }
220 : }
221 :
222 : impl TrafficMetrics {
223 0 : pub fn new() -> Self {
224 0 : let labels = ConnectionLabels::new();
225 0 : let (read, write) = labels.build_metrics();
226 0 : let state = TrafficMetricsState {
227 0 : labels,
228 0 : read,
229 0 : write,
230 0 : };
231 0 : Self {
232 0 : state: Arc::new(RwLock::new(state)),
233 0 : }
234 0 : }
235 :
236 0 : pub fn set_client_az(&self, value: &str) {
237 0 : let mut state = self.state.write().unwrap();
238 0 : state.labels.client_az = value.to_string();
239 0 : (state.read, state.write) = state.labels.build_metrics();
240 0 : }
241 :
242 0 : pub fn set_sk_az(&self, value: &str) {
243 0 : let mut state = self.state.write().unwrap();
244 0 : state.labels.sk_az = value.to_string();
245 0 : (state.read, state.write) = state.labels.build_metrics();
246 0 : }
247 :
248 0 : pub fn set_app_name(&self, value: &str) {
249 0 : let mut state = self.state.write().unwrap();
250 0 : state.labels.app_name = value.to_string();
251 0 : (state.read, state.write) = state.labels.build_metrics();
252 0 : }
253 :
254 0 : pub fn observe_read(&self, cnt: usize) {
255 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
256 0 : }
257 :
258 0 : pub fn observe_write(&self, cnt: usize) {
259 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
260 0 : }
261 : }
262 :
263 : /// Metrics for WalStorage in a single timeline.
264 0 : #[derive(Clone, Default)]
265 : pub struct WalStorageMetrics {
266 : /// How much bytes were written in total.
267 : write_wal_bytes: u64,
268 : /// How much time spent writing WAL to disk, waiting for write(2).
269 : write_wal_seconds: f64,
270 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
271 : flush_wal_seconds: f64,
272 : }
273 :
274 : impl WalStorageMetrics {
275 0 : pub fn observe_write_bytes(&mut self, bytes: usize) {
276 0 : self.write_wal_bytes += bytes as u64;
277 0 : WRITE_WAL_BYTES.observe(bytes as f64);
278 0 : }
279 :
280 0 : pub fn observe_write_seconds(&mut self, seconds: f64) {
281 0 : self.write_wal_seconds += seconds;
282 0 : WRITE_WAL_SECONDS.observe(seconds);
283 0 : }
284 :
285 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
286 0 : self.flush_wal_seconds += seconds;
287 0 : FLUSH_WAL_SECONDS.observe(seconds);
288 0 : }
289 : }
290 :
291 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
292 0 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
293 0 : closure: impl Future<Output = Result<(), E>>,
294 0 : ) -> Result<f64> {
295 0 : let start = std::time::Instant::now();
296 0 : closure.await.map_err(|e| e.into())?;
297 0 : Ok(start.elapsed().as_secs_f64())
298 0 : }
299 :
300 : /// Metrics for a single timeline.
301 0 : #[derive(Clone)]
302 : pub struct FullTimelineInfo {
303 : pub ttid: TenantTimelineId,
304 : pub ps_feedback: PageserverFeedback,
305 : pub wal_backup_active: bool,
306 : pub timeline_is_active: bool,
307 : pub num_computes: u32,
308 : pub last_removed_segno: XLogSegNo,
309 :
310 : pub epoch_start_lsn: Lsn,
311 : pub mem_state: TimelineMemState,
312 : pub persisted_state: TimelinePersistentState,
313 :
314 : pub flush_lsn: Lsn,
315 :
316 : pub wal_storage: WalStorageMetrics,
317 : }
318 :
319 : /// Collects metrics for all active timelines.
320 : pub struct TimelineCollector {
321 : descs: Vec<Desc>,
322 : commit_lsn: GenericGaugeVec<AtomicU64>,
323 : backup_lsn: GenericGaugeVec<AtomicU64>,
324 : flush_lsn: GenericGaugeVec<AtomicU64>,
325 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
326 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
327 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
328 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
329 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
330 : timeline_active: GenericGaugeVec<AtomicU64>,
331 : wal_backup_active: GenericGaugeVec<AtomicU64>,
332 : connected_computes: IntGaugeVec,
333 : disk_usage: GenericGaugeVec<AtomicU64>,
334 : acceptor_term: GenericGaugeVec<AtomicU64>,
335 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
336 : written_wal_seconds: GaugeVec,
337 : flushed_wal_seconds: GaugeVec,
338 : collect_timeline_metrics: Gauge,
339 : timelines_count: IntGauge,
340 : active_timelines_count: IntGauge,
341 : }
342 :
343 : impl Default for TimelineCollector {
344 0 : fn default() -> Self {
345 0 : Self::new()
346 0 : }
347 : }
348 :
349 : impl TimelineCollector {
350 0 : pub fn new() -> TimelineCollector {
351 0 : let mut descs = Vec::new();
352 0 :
353 0 : let commit_lsn = GenericGaugeVec::new(
354 0 : Opts::new(
355 0 : "safekeeper_commit_lsn",
356 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
357 0 : ),
358 0 : &["tenant_id", "timeline_id"],
359 0 : )
360 0 : .unwrap();
361 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
362 0 :
363 0 : let backup_lsn = GenericGaugeVec::new(
364 0 : Opts::new(
365 0 : "safekeeper_backup_lsn",
366 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
367 0 : ),
368 0 : &["tenant_id", "timeline_id"],
369 0 : )
370 0 : .unwrap();
371 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
372 0 :
373 0 : let flush_lsn = GenericGaugeVec::new(
374 0 : Opts::new(
375 0 : "safekeeper_flush_lsn",
376 0 : "Current flush_lsn, grouped by timeline",
377 0 : ),
378 0 : &["tenant_id", "timeline_id"],
379 0 : )
380 0 : .unwrap();
381 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
382 0 :
383 0 : let epoch_start_lsn = GenericGaugeVec::new(
384 0 : Opts::new(
385 0 : "safekeeper_epoch_start_lsn",
386 0 : "Point since which compute generates new WAL in the current consensus term",
387 0 : ),
388 0 : &["tenant_id", "timeline_id"],
389 0 : )
390 0 : .unwrap();
391 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
392 0 :
393 0 : let peer_horizon_lsn = GenericGaugeVec::new(
394 0 : Opts::new(
395 0 : "safekeeper_peer_horizon_lsn",
396 0 : "LSN of the most lagging safekeeper",
397 0 : ),
398 0 : &["tenant_id", "timeline_id"],
399 0 : )
400 0 : .unwrap();
401 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
402 0 :
403 0 : let remote_consistent_lsn = GenericGaugeVec::new(
404 0 : Opts::new(
405 0 : "safekeeper_remote_consistent_lsn",
406 0 : "LSN which is persisted to the remote storage in pageserver",
407 0 : ),
408 0 : &["tenant_id", "timeline_id"],
409 0 : )
410 0 : .unwrap();
411 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
412 0 :
413 0 : let ps_last_received_lsn = GenericGaugeVec::new(
414 0 : Opts::new(
415 0 : "safekeeper_ps_last_received_lsn",
416 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
417 0 : ),
418 0 : &["tenant_id", "timeline_id"],
419 0 : )
420 0 : .unwrap();
421 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
422 0 :
423 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
424 0 : Opts::new(
425 0 : "safekeeper_feedback_last_time_seconds",
426 0 : "Timestamp of the last feedback from the pageserver",
427 0 : ),
428 0 : &["tenant_id", "timeline_id"],
429 0 : )
430 0 : .unwrap();
431 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
432 0 :
433 0 : let timeline_active = GenericGaugeVec::new(
434 0 : Opts::new(
435 0 : "safekeeper_timeline_active",
436 0 : "Reports 1 for active timelines, 0 for inactive",
437 0 : ),
438 0 : &["tenant_id", "timeline_id"],
439 0 : )
440 0 : .unwrap();
441 0 : descs.extend(timeline_active.desc().into_iter().cloned());
442 0 :
443 0 : let wal_backup_active = GenericGaugeVec::new(
444 0 : Opts::new(
445 0 : "safekeeper_wal_backup_active",
446 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
447 0 : ),
448 0 : &["tenant_id", "timeline_id"],
449 0 : )
450 0 : .unwrap();
451 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
452 0 :
453 0 : let connected_computes = IntGaugeVec::new(
454 0 : Opts::new(
455 0 : "safekeeper_connected_computes",
456 0 : "Number of active compute connections",
457 0 : ),
458 0 : &["tenant_id", "timeline_id"],
459 0 : )
460 0 : .unwrap();
461 0 : descs.extend(connected_computes.desc().into_iter().cloned());
462 0 :
463 0 : let disk_usage = GenericGaugeVec::new(
464 0 : Opts::new(
465 0 : "safekeeper_disk_usage_bytes",
466 0 : "Estimated disk space used to store WAL segments",
467 0 : ),
468 0 : &["tenant_id", "timeline_id"],
469 0 : )
470 0 : .unwrap();
471 0 : descs.extend(disk_usage.desc().into_iter().cloned());
472 0 :
473 0 : let acceptor_term = GenericGaugeVec::new(
474 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
475 0 : &["tenant_id", "timeline_id"],
476 0 : )
477 0 : .unwrap();
478 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
479 0 :
480 0 : let written_wal_bytes = GenericGaugeVec::new(
481 0 : Opts::new(
482 0 : "safekeeper_written_wal_bytes_total",
483 0 : "Number of WAL bytes written to disk, grouped by timeline",
484 0 : ),
485 0 : &["tenant_id", "timeline_id"],
486 0 : )
487 0 : .unwrap();
488 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
489 0 :
490 0 : let written_wal_seconds = GaugeVec::new(
491 0 : Opts::new(
492 0 : "safekeeper_written_wal_seconds_total",
493 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
494 0 : ),
495 0 : &["tenant_id", "timeline_id"],
496 0 : )
497 0 : .unwrap();
498 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
499 0 :
500 0 : let flushed_wal_seconds = GaugeVec::new(
501 0 : Opts::new(
502 0 : "safekeeper_flushed_wal_seconds_total",
503 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
504 0 : ),
505 0 : &["tenant_id", "timeline_id"],
506 0 : )
507 0 : .unwrap();
508 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
509 0 :
510 0 : let collect_timeline_metrics = Gauge::new(
511 0 : "safekeeper_collect_timeline_metrics_seconds",
512 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
513 0 : )
514 0 : .unwrap();
515 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
516 0 :
517 0 : let timelines_count = IntGauge::new(
518 0 : "safekeeper_timelines",
519 0 : "Total number of timelines loaded in-memory",
520 0 : )
521 0 : .unwrap();
522 0 : descs.extend(timelines_count.desc().into_iter().cloned());
523 0 :
524 0 : let active_timelines_count = IntGauge::new(
525 0 : "safekeeper_active_timelines",
526 0 : "Total number of active timelines",
527 0 : )
528 0 : .unwrap();
529 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
530 0 :
531 0 : TimelineCollector {
532 0 : descs,
533 0 : commit_lsn,
534 0 : backup_lsn,
535 0 : flush_lsn,
536 0 : epoch_start_lsn,
537 0 : peer_horizon_lsn,
538 0 : remote_consistent_lsn,
539 0 : ps_last_received_lsn,
540 0 : feedback_last_time_seconds,
541 0 : timeline_active,
542 0 : wal_backup_active,
543 0 : connected_computes,
544 0 : disk_usage,
545 0 : acceptor_term,
546 0 : written_wal_bytes,
547 0 : written_wal_seconds,
548 0 : flushed_wal_seconds,
549 0 : collect_timeline_metrics,
550 0 : timelines_count,
551 0 : active_timelines_count,
552 0 : }
553 0 : }
554 : }
555 :
556 : impl Collector for TimelineCollector {
557 0 : fn desc(&self) -> Vec<&Desc> {
558 0 : self.descs.iter().collect()
559 0 : }
560 :
561 0 : fn collect(&self) -> Vec<MetricFamily> {
562 0 : let start_collecting = Instant::now();
563 0 :
564 0 : // reset all metrics to clean up inactive timelines
565 0 : self.commit_lsn.reset();
566 0 : self.backup_lsn.reset();
567 0 : self.flush_lsn.reset();
568 0 : self.epoch_start_lsn.reset();
569 0 : self.peer_horizon_lsn.reset();
570 0 : self.remote_consistent_lsn.reset();
571 0 : self.ps_last_received_lsn.reset();
572 0 : self.feedback_last_time_seconds.reset();
573 0 : self.timeline_active.reset();
574 0 : self.wal_backup_active.reset();
575 0 : self.connected_computes.reset();
576 0 : self.disk_usage.reset();
577 0 : self.acceptor_term.reset();
578 0 : self.written_wal_bytes.reset();
579 0 : self.written_wal_seconds.reset();
580 0 : self.flushed_wal_seconds.reset();
581 0 :
582 0 : let timelines = GlobalTimelines::get_all();
583 0 : let timelines_count = timelines.len();
584 0 : let mut active_timelines_count = 0;
585 0 :
586 0 : // Prometheus Collector is sync, and data is stored under async lock. To
587 0 : // bridge the gap with a crutch, collect data in spawned thread with
588 0 : // local tokio runtime.
589 0 : let infos = std::thread::spawn(|| {
590 0 : let rt = tokio::runtime::Builder::new_current_thread()
591 0 : .build()
592 0 : .expect("failed to create rt");
593 0 : rt.block_on(collect_timeline_metrics())
594 0 : })
595 0 : .join()
596 0 : .expect("collect_timeline_metrics thread panicked");
597 :
598 0 : for tli in &infos {
599 0 : let tenant_id = tli.ttid.tenant_id.to_string();
600 0 : let timeline_id = tli.ttid.timeline_id.to_string();
601 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
602 0 :
603 0 : if tli.timeline_is_active {
604 0 : active_timelines_count += 1;
605 0 : }
606 :
607 0 : self.commit_lsn
608 0 : .with_label_values(labels)
609 0 : .set(tli.mem_state.commit_lsn.into());
610 0 : self.backup_lsn
611 0 : .with_label_values(labels)
612 0 : .set(tli.mem_state.backup_lsn.into());
613 0 : self.flush_lsn
614 0 : .with_label_values(labels)
615 0 : .set(tli.flush_lsn.into());
616 0 : self.epoch_start_lsn
617 0 : .with_label_values(labels)
618 0 : .set(tli.epoch_start_lsn.into());
619 0 : self.peer_horizon_lsn
620 0 : .with_label_values(labels)
621 0 : .set(tli.mem_state.peer_horizon_lsn.into());
622 0 : self.remote_consistent_lsn
623 0 : .with_label_values(labels)
624 0 : .set(tli.mem_state.remote_consistent_lsn.into());
625 0 : self.timeline_active
626 0 : .with_label_values(labels)
627 0 : .set(tli.timeline_is_active as u64);
628 0 : self.wal_backup_active
629 0 : .with_label_values(labels)
630 0 : .set(tli.wal_backup_active as u64);
631 0 : self.connected_computes
632 0 : .with_label_values(labels)
633 0 : .set(tli.num_computes as i64);
634 0 : self.acceptor_term
635 0 : .with_label_values(labels)
636 0 : .set(tli.persisted_state.acceptor_state.term);
637 0 : self.written_wal_bytes
638 0 : .with_label_values(labels)
639 0 : .set(tli.wal_storage.write_wal_bytes);
640 0 : self.written_wal_seconds
641 0 : .with_label_values(labels)
642 0 : .set(tli.wal_storage.write_wal_seconds);
643 0 : self.flushed_wal_seconds
644 0 : .with_label_values(labels)
645 0 : .set(tli.wal_storage.flush_wal_seconds);
646 0 :
647 0 : self.ps_last_received_lsn
648 0 : .with_label_values(labels)
649 0 : .set(tli.ps_feedback.last_received_lsn.0);
650 0 : if let Ok(unix_time) = tli
651 0 : .ps_feedback
652 0 : .replytime
653 0 : .duration_since(SystemTime::UNIX_EPOCH)
654 0 : {
655 0 : self.feedback_last_time_seconds
656 0 : .with_label_values(labels)
657 0 : .set(unix_time.as_secs());
658 0 : }
659 :
660 0 : if tli.last_removed_segno != 0 {
661 0 : let segno_count = tli
662 0 : .flush_lsn
663 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
664 0 : - tli.last_removed_segno;
665 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
666 0 : self.disk_usage
667 0 : .with_label_values(labels)
668 0 : .set(disk_usage_bytes);
669 0 : }
670 : }
671 :
672 : // collect MetricFamilys.
673 0 : let mut mfs = Vec::new();
674 0 : mfs.extend(self.commit_lsn.collect());
675 0 : mfs.extend(self.backup_lsn.collect());
676 0 : mfs.extend(self.flush_lsn.collect());
677 0 : mfs.extend(self.epoch_start_lsn.collect());
678 0 : mfs.extend(self.peer_horizon_lsn.collect());
679 0 : mfs.extend(self.remote_consistent_lsn.collect());
680 0 : mfs.extend(self.ps_last_received_lsn.collect());
681 0 : mfs.extend(self.feedback_last_time_seconds.collect());
682 0 : mfs.extend(self.timeline_active.collect());
683 0 : mfs.extend(self.wal_backup_active.collect());
684 0 : mfs.extend(self.connected_computes.collect());
685 0 : mfs.extend(self.disk_usage.collect());
686 0 : mfs.extend(self.acceptor_term.collect());
687 0 : mfs.extend(self.written_wal_bytes.collect());
688 0 : mfs.extend(self.written_wal_seconds.collect());
689 0 : mfs.extend(self.flushed_wal_seconds.collect());
690 0 :
691 0 : // report time it took to collect all info
692 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
693 0 : self.collect_timeline_metrics.set(elapsed);
694 0 : mfs.extend(self.collect_timeline_metrics.collect());
695 0 :
696 0 : // report total number of timelines
697 0 : self.timelines_count.set(timelines_count as i64);
698 0 : mfs.extend(self.timelines_count.collect());
699 0 :
700 0 : self.active_timelines_count
701 0 : .set(active_timelines_count as i64);
702 0 : mfs.extend(self.active_timelines_count.collect());
703 0 :
704 0 : mfs
705 0 : }
706 : }
707 :
708 0 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
709 0 : let mut res = vec![];
710 0 : let timelines = GlobalTimelines::get_all();
711 :
712 0 : for tli in timelines {
713 0 : if let Some(info) = tli.info_for_metrics().await {
714 0 : res.push(info);
715 0 : }
716 : }
717 0 : res
718 0 : }
|