Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
15 : IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
16 : };
17 : use once_cell::sync::Lazy;
18 :
19 : use postgres_ffi::XLogSegNo;
20 : use utils::pageserver_feedback::PageserverFeedback;
21 : use utils::{id::TenantTimelineId, lsn::Lsn};
22 :
23 : use crate::{
24 : state::{TimelineMemState, TimelinePersistentState},
25 : GlobalTimelines,
26 : };
27 :
28 : // Global metrics across all timelines.
29 431 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
30 431 : register_histogram!(
31 431 : "safekeeper_write_wal_bytes",
32 431 : "Bytes written to WAL in a single request",
33 431 : vec![
34 431 : 1.0,
35 431 : 10.0,
36 431 : 100.0,
37 431 : 1024.0,
38 431 : 8192.0,
39 431 : 128.0 * 1024.0,
40 431 : 1024.0 * 1024.0,
41 431 : 10.0 * 1024.0 * 1024.0
42 431 : ]
43 431 : )
44 431 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
45 431 : });
46 431 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
47 431 : register_histogram!(
48 431 : "safekeeper_write_wal_seconds",
49 431 : "Seconds spent writing and syncing WAL to a disk in a single request",
50 431 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
51 431 : )
52 431 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
53 431 : });
54 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
55 0 : register_histogram!(
56 0 : "safekeeper_flush_wal_seconds",
57 0 : "Seconds spent syncing WAL to a disk",
58 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
59 0 : )
60 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
61 0 : });
62 439 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
63 439 : register_histogram!(
64 439 : "safekeeper_persist_control_file_seconds",
65 439 : "Seconds to persist and sync control file",
66 439 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
67 439 : )
68 439 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
69 439 : });
70 438 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
71 438 : register_int_counter_vec!(
72 438 : "safekeeper_pg_io_bytes_total",
73 438 : "Bytes read from or written to any PostgreSQL connection",
74 438 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
75 438 : )
76 438 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
77 438 : });
78 421 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
79 421 : register_int_counter!(
80 421 : "safekeeper_broker_pushed_updates_total",
81 421 : "Number of timeline updates pushed to the broker"
82 421 : )
83 421 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
84 421 : });
85 508 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
86 508 : register_int_counter_vec!(
87 508 : "safekeeper_broker_pulled_updates_total",
88 508 : "Number of timeline updates pulled and processed from the broker",
89 508 : &["result"]
90 508 : )
91 508 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
92 508 : });
93 438 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
94 876 : register_int_counter_pair_vec!(
95 876 : "safekeeper_pg_queries_received_total",
96 876 : "Number of queries received through pg protocol",
97 876 : "safekeeper_pg_queries_finished_total",
98 876 : "Number of queries finished through pg protocol",
99 876 : &["query"]
100 876 : )
101 438 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
102 438 : });
103 7 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
104 7 : register_int_counter!(
105 7 : "safekeeper_removed_wal_segments_total",
106 7 : "Number of WAL segments removed from the disk"
107 7 : )
108 7 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
109 7 : });
110 7 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
111 7 : register_int_counter!(
112 7 : "safekeeper_backed_up_segments_total",
113 7 : "Number of WAL segments backed up to the S3"
114 7 : )
115 7 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
116 7 : });
117 1 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
118 1 : register_int_counter!(
119 1 : "safekeeper_backup_errors_total",
120 1 : "Number of errors during backup"
121 1 : )
122 1 : .expect("Failed to register safekeeper_backup_errors_total counter")
123 1 : });
124 508 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
125 508 : register_histogram!(
126 508 : "safekeeper_broker_push_update_seconds",
127 508 : "Seconds to push all timeline updates to the broker",
128 508 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
129 508 : )
130 508 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
131 508 : });
132 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
133 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
134 : ];
135 508 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
136 508 : register_histogram!(
137 508 : "safekeeper_broker_iteration_timelines",
138 508 : "Count of timelines pushed to the broker in a single iteration",
139 508 : TIMELINES_COUNT_BUCKETS.to_vec()
140 508 : )
141 508 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
142 508 : });
143 :
144 : pub const LABEL_UNKNOWN: &str = "unknown";
145 :
146 : /// Labels for traffic metrics.
147 0 : #[derive(Clone)]
148 : struct ConnectionLabels {
149 : /// Availability zone of the connection origin.
150 : client_az: String,
151 : /// Availability zone of the current safekeeper.
152 : sk_az: String,
153 : /// Client application name.
154 : app_name: String,
155 : }
156 :
157 : impl ConnectionLabels {
158 3288 : fn new() -> Self {
159 3288 : Self {
160 3288 : client_az: LABEL_UNKNOWN.to_string(),
161 3288 : sk_az: LABEL_UNKNOWN.to_string(),
162 3288 : app_name: LABEL_UNKNOWN.to_string(),
163 3288 : }
164 3288 : }
165 :
166 7320 : fn build_metrics(
167 7320 : &self,
168 7320 : ) -> (
169 7320 : GenericCounter<metrics::core::AtomicU64>,
170 7320 : GenericCounter<metrics::core::AtomicU64>,
171 7320 : ) {
172 7320 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
173 7320 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
174 8 : (client_az, sk_az) => {
175 8 : if client_az == sk_az {
176 0 : "true"
177 : } else {
178 8 : "false"
179 : }
180 : }
181 : };
182 :
183 7320 : let read = PG_IO_BYTES.with_label_values(&[
184 7320 : &self.client_az,
185 7320 : &self.sk_az,
186 7320 : &self.app_name,
187 7320 : "read",
188 7320 : same_az,
189 7320 : ]);
190 7320 : let write = PG_IO_BYTES.with_label_values(&[
191 7320 : &self.client_az,
192 7320 : &self.sk_az,
193 7320 : &self.app_name,
194 7320 : "write",
195 7320 : same_az,
196 7320 : ]);
197 7320 : (read, write)
198 7320 : }
199 : }
200 :
201 : struct TrafficMetricsState {
202 : /// Labels for traffic metrics.
203 : labels: ConnectionLabels,
204 : /// Total bytes read from this connection.
205 : read: GenericCounter<metrics::core::AtomicU64>,
206 : /// Total bytes written to this connection.
207 : write: GenericCounter<metrics::core::AtomicU64>,
208 : }
209 :
210 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
211 3288 : #[derive(Clone)]
212 : pub struct TrafficMetrics {
213 : state: Arc<RwLock<TrafficMetricsState>>,
214 : }
215 :
216 : impl Default for TrafficMetrics {
217 0 : fn default() -> Self {
218 0 : Self::new()
219 0 : }
220 : }
221 :
222 : impl TrafficMetrics {
223 3288 : pub fn new() -> Self {
224 3288 : let labels = ConnectionLabels::new();
225 3288 : let (read, write) = labels.build_metrics();
226 3288 : let state = TrafficMetricsState {
227 3288 : labels,
228 3288 : read,
229 3288 : write,
230 3288 : };
231 3288 : Self {
232 3288 : state: Arc::new(RwLock::new(state)),
233 3288 : }
234 3288 : }
235 :
236 4 : pub fn set_client_az(&self, value: &str) {
237 4 : let mut state = self.state.write().unwrap();
238 4 : state.labels.client_az = value.to_string();
239 4 : (state.read, state.write) = state.labels.build_metrics();
240 4 : }
241 :
242 3288 : pub fn set_sk_az(&self, value: &str) {
243 3288 : let mut state = self.state.write().unwrap();
244 3288 : state.labels.sk_az = value.to_string();
245 3288 : (state.read, state.write) = state.labels.build_metrics();
246 3288 : }
247 :
248 740 : pub fn set_app_name(&self, value: &str) {
249 740 : let mut state = self.state.write().unwrap();
250 740 : state.labels.app_name = value.to_string();
251 740 : (state.read, state.write) = state.labels.build_metrics();
252 740 : }
253 :
254 2939806 : pub fn observe_read(&self, cnt: usize) {
255 2939806 : self.state.read().unwrap().read.inc_by(cnt as u64)
256 2939806 : }
257 :
258 2481339 : pub fn observe_write(&self, cnt: usize) {
259 2481339 : self.state.read().unwrap().write.inc_by(cnt as u64)
260 2481339 : }
261 : }
262 :
263 : /// Metrics for WalStorage in a single timeline.
264 661 : #[derive(Clone, Default)]
265 : pub struct WalStorageMetrics {
266 : /// How much bytes were written in total.
267 : write_wal_bytes: u64,
268 : /// How much time spent writing WAL to disk, waiting for write(2).
269 : write_wal_seconds: f64,
270 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
271 : flush_wal_seconds: f64,
272 : }
273 :
274 : impl WalStorageMetrics {
275 1712562 : pub fn observe_write_bytes(&mut self, bytes: usize) {
276 1712562 : self.write_wal_bytes += bytes as u64;
277 1712562 : WRITE_WAL_BYTES.observe(bytes as f64);
278 1712562 : }
279 :
280 1712562 : pub fn observe_write_seconds(&mut self, seconds: f64) {
281 1712562 : self.write_wal_seconds += seconds;
282 1712562 : WRITE_WAL_SECONDS.observe(seconds);
283 1712562 : }
284 :
285 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
286 0 : self.flush_wal_seconds += seconds;
287 0 : FLUSH_WAL_SECONDS.observe(seconds);
288 0 : }
289 : }
290 :
291 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
292 1712564 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
293 1712564 : closure: impl Future<Output = Result<(), E>>,
294 1712564 : ) -> Result<f64> {
295 1712564 : let start = std::time::Instant::now();
296 3147876 : closure.await.map_err(|e| e.into())?;
297 1712562 : Ok(start.elapsed().as_secs_f64())
298 1712564 : }
299 :
300 : /// Metrics for a single timeline.
301 0 : #[derive(Clone)]
302 : pub struct FullTimelineInfo {
303 : pub ttid: TenantTimelineId,
304 : pub ps_feedback: PageserverFeedback,
305 : pub wal_backup_active: bool,
306 : pub timeline_is_active: bool,
307 : pub num_computes: u32,
308 : pub last_removed_segno: XLogSegNo,
309 :
310 : pub epoch_start_lsn: Lsn,
311 : pub mem_state: TimelineMemState,
312 : pub persisted_state: TimelinePersistentState,
313 :
314 : pub flush_lsn: Lsn,
315 :
316 : pub wal_storage: WalStorageMetrics,
317 : }
318 :
319 : /// Collects metrics for all active timelines.
320 : pub struct TimelineCollector {
321 : descs: Vec<Desc>,
322 : commit_lsn: GenericGaugeVec<AtomicU64>,
323 : backup_lsn: GenericGaugeVec<AtomicU64>,
324 : flush_lsn: GenericGaugeVec<AtomicU64>,
325 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
326 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
327 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
328 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
329 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
330 : timeline_active: GenericGaugeVec<AtomicU64>,
331 : wal_backup_active: GenericGaugeVec<AtomicU64>,
332 : connected_computes: IntGaugeVec,
333 : disk_usage: GenericGaugeVec<AtomicU64>,
334 : acceptor_term: GenericGaugeVec<AtomicU64>,
335 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
336 : written_wal_seconds: GaugeVec,
337 : flushed_wal_seconds: GaugeVec,
338 : collect_timeline_metrics: Gauge,
339 : timelines_count: IntGauge,
340 : active_timelines_count: IntGauge,
341 : }
342 :
343 : impl Default for TimelineCollector {
344 0 : fn default() -> Self {
345 0 : Self::new()
346 0 : }
347 : }
348 :
349 : impl TimelineCollector {
350 508 : pub fn new() -> TimelineCollector {
351 508 : let mut descs = Vec::new();
352 508 :
353 508 : let commit_lsn = GenericGaugeVec::new(
354 508 : Opts::new(
355 508 : "safekeeper_commit_lsn",
356 508 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
357 508 : ),
358 508 : &["tenant_id", "timeline_id"],
359 508 : )
360 508 : .unwrap();
361 508 : descs.extend(commit_lsn.desc().into_iter().cloned());
362 508 :
363 508 : let backup_lsn = GenericGaugeVec::new(
364 508 : Opts::new(
365 508 : "safekeeper_backup_lsn",
366 508 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
367 508 : ),
368 508 : &["tenant_id", "timeline_id"],
369 508 : )
370 508 : .unwrap();
371 508 : descs.extend(backup_lsn.desc().into_iter().cloned());
372 508 :
373 508 : let flush_lsn = GenericGaugeVec::new(
374 508 : Opts::new(
375 508 : "safekeeper_flush_lsn",
376 508 : "Current flush_lsn, grouped by timeline",
377 508 : ),
378 508 : &["tenant_id", "timeline_id"],
379 508 : )
380 508 : .unwrap();
381 508 : descs.extend(flush_lsn.desc().into_iter().cloned());
382 508 :
383 508 : let epoch_start_lsn = GenericGaugeVec::new(
384 508 : Opts::new(
385 508 : "safekeeper_epoch_start_lsn",
386 508 : "Point since which compute generates new WAL in the current consensus term",
387 508 : ),
388 508 : &["tenant_id", "timeline_id"],
389 508 : )
390 508 : .unwrap();
391 508 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
392 508 :
393 508 : let peer_horizon_lsn = GenericGaugeVec::new(
394 508 : Opts::new(
395 508 : "safekeeper_peer_horizon_lsn",
396 508 : "LSN of the most lagging safekeeper",
397 508 : ),
398 508 : &["tenant_id", "timeline_id"],
399 508 : )
400 508 : .unwrap();
401 508 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
402 508 :
403 508 : let remote_consistent_lsn = GenericGaugeVec::new(
404 508 : Opts::new(
405 508 : "safekeeper_remote_consistent_lsn",
406 508 : "LSN which is persisted to the remote storage in pageserver",
407 508 : ),
408 508 : &["tenant_id", "timeline_id"],
409 508 : )
410 508 : .unwrap();
411 508 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
412 508 :
413 508 : let ps_last_received_lsn = GenericGaugeVec::new(
414 508 : Opts::new(
415 508 : "safekeeper_ps_last_received_lsn",
416 508 : "Last LSN received by the pageserver, acknowledged in the feedback",
417 508 : ),
418 508 : &["tenant_id", "timeline_id"],
419 508 : )
420 508 : .unwrap();
421 508 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
422 508 :
423 508 : let feedback_last_time_seconds = GenericGaugeVec::new(
424 508 : Opts::new(
425 508 : "safekeeper_feedback_last_time_seconds",
426 508 : "Timestamp of the last feedback from the pageserver",
427 508 : ),
428 508 : &["tenant_id", "timeline_id"],
429 508 : )
430 508 : .unwrap();
431 508 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
432 508 :
433 508 : let timeline_active = GenericGaugeVec::new(
434 508 : Opts::new(
435 508 : "safekeeper_timeline_active",
436 508 : "Reports 1 for active timelines, 0 for inactive",
437 508 : ),
438 508 : &["tenant_id", "timeline_id"],
439 508 : )
440 508 : .unwrap();
441 508 : descs.extend(timeline_active.desc().into_iter().cloned());
442 508 :
443 508 : let wal_backup_active = GenericGaugeVec::new(
444 508 : Opts::new(
445 508 : "safekeeper_wal_backup_active",
446 508 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
447 508 : ),
448 508 : &["tenant_id", "timeline_id"],
449 508 : )
450 508 : .unwrap();
451 508 : descs.extend(wal_backup_active.desc().into_iter().cloned());
452 508 :
453 508 : let connected_computes = IntGaugeVec::new(
454 508 : Opts::new(
455 508 : "safekeeper_connected_computes",
456 508 : "Number of active compute connections",
457 508 : ),
458 508 : &["tenant_id", "timeline_id"],
459 508 : )
460 508 : .unwrap();
461 508 : descs.extend(connected_computes.desc().into_iter().cloned());
462 508 :
463 508 : let disk_usage = GenericGaugeVec::new(
464 508 : Opts::new(
465 508 : "safekeeper_disk_usage_bytes",
466 508 : "Estimated disk space used to store WAL segments",
467 508 : ),
468 508 : &["tenant_id", "timeline_id"],
469 508 : )
470 508 : .unwrap();
471 508 : descs.extend(disk_usage.desc().into_iter().cloned());
472 508 :
473 508 : let acceptor_term = GenericGaugeVec::new(
474 508 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
475 508 : &["tenant_id", "timeline_id"],
476 508 : )
477 508 : .unwrap();
478 508 : descs.extend(acceptor_term.desc().into_iter().cloned());
479 508 :
480 508 : let written_wal_bytes = GenericGaugeVec::new(
481 508 : Opts::new(
482 508 : "safekeeper_written_wal_bytes_total",
483 508 : "Number of WAL bytes written to disk, grouped by timeline",
484 508 : ),
485 508 : &["tenant_id", "timeline_id"],
486 508 : )
487 508 : .unwrap();
488 508 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
489 508 :
490 508 : let written_wal_seconds = GaugeVec::new(
491 508 : Opts::new(
492 508 : "safekeeper_written_wal_seconds_total",
493 508 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
494 508 : ),
495 508 : &["tenant_id", "timeline_id"],
496 508 : )
497 508 : .unwrap();
498 508 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
499 508 :
500 508 : let flushed_wal_seconds = GaugeVec::new(
501 508 : Opts::new(
502 508 : "safekeeper_flushed_wal_seconds_total",
503 508 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
504 508 : ),
505 508 : &["tenant_id", "timeline_id"],
506 508 : )
507 508 : .unwrap();
508 508 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
509 508 :
510 508 : let collect_timeline_metrics = Gauge::new(
511 508 : "safekeeper_collect_timeline_metrics_seconds",
512 508 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
513 508 : )
514 508 : .unwrap();
515 508 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
516 508 :
517 508 : let timelines_count = IntGauge::new(
518 508 : "safekeeper_timelines",
519 508 : "Total number of timelines loaded in-memory",
520 508 : )
521 508 : .unwrap();
522 508 : descs.extend(timelines_count.desc().into_iter().cloned());
523 508 :
524 508 : let active_timelines_count = IntGauge::new(
525 508 : "safekeeper_active_timelines",
526 508 : "Total number of active timelines",
527 508 : )
528 508 : .unwrap();
529 508 : descs.extend(active_timelines_count.desc().into_iter().cloned());
530 508 :
531 508 : TimelineCollector {
532 508 : descs,
533 508 : commit_lsn,
534 508 : backup_lsn,
535 508 : flush_lsn,
536 508 : epoch_start_lsn,
537 508 : peer_horizon_lsn,
538 508 : remote_consistent_lsn,
539 508 : ps_last_received_lsn,
540 508 : feedback_last_time_seconds,
541 508 : timeline_active,
542 508 : wal_backup_active,
543 508 : connected_computes,
544 508 : disk_usage,
545 508 : acceptor_term,
546 508 : written_wal_bytes,
547 508 : written_wal_seconds,
548 508 : flushed_wal_seconds,
549 508 : collect_timeline_metrics,
550 508 : timelines_count,
551 508 : active_timelines_count,
552 508 : }
553 508 : }
554 : }
555 :
556 : impl Collector for TimelineCollector {
557 508 : fn desc(&self) -> Vec<&Desc> {
558 508 : self.descs.iter().collect()
559 508 : }
560 :
561 19 : fn collect(&self) -> Vec<MetricFamily> {
562 19 : let start_collecting = Instant::now();
563 19 :
564 19 : // reset all metrics to clean up inactive timelines
565 19 : self.commit_lsn.reset();
566 19 : self.backup_lsn.reset();
567 19 : self.flush_lsn.reset();
568 19 : self.epoch_start_lsn.reset();
569 19 : self.peer_horizon_lsn.reset();
570 19 : self.remote_consistent_lsn.reset();
571 19 : self.ps_last_received_lsn.reset();
572 19 : self.feedback_last_time_seconds.reset();
573 19 : self.timeline_active.reset();
574 19 : self.wal_backup_active.reset();
575 19 : self.connected_computes.reset();
576 19 : self.disk_usage.reset();
577 19 : self.acceptor_term.reset();
578 19 : self.written_wal_bytes.reset();
579 19 : self.written_wal_seconds.reset();
580 19 : self.flushed_wal_seconds.reset();
581 19 :
582 19 : let timelines = GlobalTimelines::get_all();
583 19 : let timelines_count = timelines.len();
584 19 : let mut active_timelines_count = 0;
585 19 :
586 19 : // Prometheus Collector is sync, and data is stored under async lock. To
587 19 : // bridge the gap with a crutch, collect data in spawned thread with
588 19 : // local tokio runtime.
589 19 : let infos = std::thread::spawn(|| {
590 19 : let rt = tokio::runtime::Builder::new_current_thread()
591 19 : .build()
592 19 : .expect("failed to create rt");
593 19 : rt.block_on(collect_timeline_metrics())
594 19 : })
595 19 : .join()
596 19 : .expect("collect_timeline_metrics thread panicked");
597 :
598 70 : for tli in &infos {
599 51 : let tenant_id = tli.ttid.tenant_id.to_string();
600 51 : let timeline_id = tli.ttid.timeline_id.to_string();
601 51 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
602 51 :
603 51 : if tli.timeline_is_active {
604 51 : active_timelines_count += 1;
605 51 : }
606 :
607 51 : self.commit_lsn
608 51 : .with_label_values(labels)
609 51 : .set(tli.mem_state.commit_lsn.into());
610 51 : self.backup_lsn
611 51 : .with_label_values(labels)
612 51 : .set(tli.mem_state.backup_lsn.into());
613 51 : self.flush_lsn
614 51 : .with_label_values(labels)
615 51 : .set(tli.flush_lsn.into());
616 51 : self.epoch_start_lsn
617 51 : .with_label_values(labels)
618 51 : .set(tli.epoch_start_lsn.into());
619 51 : self.peer_horizon_lsn
620 51 : .with_label_values(labels)
621 51 : .set(tli.mem_state.peer_horizon_lsn.into());
622 51 : self.remote_consistent_lsn
623 51 : .with_label_values(labels)
624 51 : .set(tli.mem_state.remote_consistent_lsn.into());
625 51 : self.timeline_active
626 51 : .with_label_values(labels)
627 51 : .set(tli.timeline_is_active as u64);
628 51 : self.wal_backup_active
629 51 : .with_label_values(labels)
630 51 : .set(tli.wal_backup_active as u64);
631 51 : self.connected_computes
632 51 : .with_label_values(labels)
633 51 : .set(tli.num_computes as i64);
634 51 : self.acceptor_term
635 51 : .with_label_values(labels)
636 51 : .set(tli.persisted_state.acceptor_state.term);
637 51 : self.written_wal_bytes
638 51 : .with_label_values(labels)
639 51 : .set(tli.wal_storage.write_wal_bytes);
640 51 : self.written_wal_seconds
641 51 : .with_label_values(labels)
642 51 : .set(tli.wal_storage.write_wal_seconds);
643 51 : self.flushed_wal_seconds
644 51 : .with_label_values(labels)
645 51 : .set(tli.wal_storage.flush_wal_seconds);
646 51 :
647 51 : self.ps_last_received_lsn
648 51 : .with_label_values(labels)
649 51 : .set(tli.ps_feedback.last_received_lsn.0);
650 51 : if let Ok(unix_time) = tli
651 51 : .ps_feedback
652 51 : .replytime
653 51 : .duration_since(SystemTime::UNIX_EPOCH)
654 51 : {
655 51 : self.feedback_last_time_seconds
656 51 : .with_label_values(labels)
657 51 : .set(unix_time.as_secs());
658 51 : }
659 :
660 51 : if tli.last_removed_segno != 0 {
661 0 : let segno_count = tli
662 0 : .flush_lsn
663 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
664 0 : - tli.last_removed_segno;
665 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
666 0 : self.disk_usage
667 0 : .with_label_values(labels)
668 0 : .set(disk_usage_bytes);
669 51 : }
670 : }
671 :
672 : // collect MetricFamilys.
673 19 : let mut mfs = Vec::new();
674 19 : mfs.extend(self.commit_lsn.collect());
675 19 : mfs.extend(self.backup_lsn.collect());
676 19 : mfs.extend(self.flush_lsn.collect());
677 19 : mfs.extend(self.epoch_start_lsn.collect());
678 19 : mfs.extend(self.peer_horizon_lsn.collect());
679 19 : mfs.extend(self.remote_consistent_lsn.collect());
680 19 : mfs.extend(self.ps_last_received_lsn.collect());
681 19 : mfs.extend(self.feedback_last_time_seconds.collect());
682 19 : mfs.extend(self.timeline_active.collect());
683 19 : mfs.extend(self.wal_backup_active.collect());
684 19 : mfs.extend(self.connected_computes.collect());
685 19 : mfs.extend(self.disk_usage.collect());
686 19 : mfs.extend(self.acceptor_term.collect());
687 19 : mfs.extend(self.written_wal_bytes.collect());
688 19 : mfs.extend(self.written_wal_seconds.collect());
689 19 : mfs.extend(self.flushed_wal_seconds.collect());
690 19 :
691 19 : // report time it took to collect all info
692 19 : let elapsed = start_collecting.elapsed().as_secs_f64();
693 19 : self.collect_timeline_metrics.set(elapsed);
694 19 : mfs.extend(self.collect_timeline_metrics.collect());
695 19 :
696 19 : // report total number of timelines
697 19 : self.timelines_count.set(timelines_count as i64);
698 19 : self.active_timelines_count
699 19 : .set(active_timelines_count as i64);
700 19 : mfs.extend(self.timelines_count.collect());
701 19 :
702 19 : mfs
703 19 : }
704 : }
705 :
706 19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
707 19 : let mut res = vec![];
708 19 : let timelines = GlobalTimelines::get_all();
709 :
710 70 : for tli in timelines {
711 51 : if let Some(info) = tli.info_for_metrics().await {
712 51 : res.push(info);
713 51 : }
714 : }
715 19 : res
716 19 : }
|