TLA Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
9 : use anyhow::Result;
10 : use futures::Future;
11 : use metrics::{
12 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
13 : proto::MetricFamily,
14 : register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
15 : IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
16 : };
17 : use once_cell::sync::Lazy;
18 :
19 : use postgres_ffi::XLogSegNo;
20 : use utils::pageserver_feedback::PageserverFeedback;
21 : use utils::{id::TenantTimelineId, lsn::Lsn};
22 :
23 : use crate::{
24 : safekeeper::{SafeKeeperState, SafekeeperMemState},
25 : GlobalTimelines,
26 : };
27 :
28 : // Global metrics across all timelines.
29 CBC 411 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
30 411 : register_histogram!(
31 411 : "safekeeper_write_wal_bytes",
32 411 : "Bytes written to WAL in a single request",
33 411 : vec![
34 411 : 1.0,
35 411 : 10.0,
36 411 : 100.0,
37 411 : 1024.0,
38 411 : 8192.0,
39 411 : 128.0 * 1024.0,
40 411 : 1024.0 * 1024.0,
41 411 : 10.0 * 1024.0 * 1024.0
42 411 : ]
43 411 : )
44 411 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
45 411 : });
46 411 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
47 411 : register_histogram!(
48 411 : "safekeeper_write_wal_seconds",
49 411 : "Seconds spent writing and syncing WAL to a disk in a single request",
50 411 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
51 411 : )
52 411 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
53 411 : });
54 UBC 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
55 0 : register_histogram!(
56 0 : "safekeeper_flush_wal_seconds",
57 0 : "Seconds spent syncing WAL to a disk",
58 0 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
59 0 : )
60 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
61 0 : });
62 CBC 416 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
63 416 : register_histogram!(
64 416 : "safekeeper_persist_control_file_seconds",
65 416 : "Seconds to persist and sync control file",
66 416 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
67 416 : )
68 416 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
69 416 : });
70 418 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
71 418 : register_int_counter_vec!(
72 418 : "safekeeper_pg_io_bytes_total",
73 418 : "Bytes read from or written to any PostgreSQL connection",
74 418 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
75 418 : )
76 418 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
77 418 : });
78 407 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
79 407 : register_int_counter!(
80 407 : "safekeeper_broker_pushed_updates_total",
81 407 : "Number of timeline updates pushed to the broker"
82 407 : )
83 407 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
84 407 : });
85 485 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
86 485 : register_int_counter_vec!(
87 485 : "safekeeper_broker_pulled_updates_total",
88 485 : "Number of timeline updates pulled and processed from the broker",
89 485 : &["result"]
90 485 : )
91 485 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
92 485 : });
93 418 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
94 836 : register_int_counter_pair_vec!(
95 836 : "safekeeper_pg_queries_received_total",
96 836 : "Number of queries received through pg protocol",
97 836 : "safekeeper_pg_queries_finished_total",
98 836 : "Number of queries finished through pg protocol",
99 836 : &["query"]
100 836 : )
101 418 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
102 418 : });
103 8 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
104 8 : register_int_counter!(
105 8 : "safekeeper_removed_wal_segments_total",
106 8 : "Number of WAL segments removed from the disk"
107 8 : )
108 8 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
109 8 : });
110 6 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
111 6 : register_int_counter!(
112 6 : "safekeeper_backed_up_segments_total",
113 6 : "Number of WAL segments backed up to the broker"
114 6 : )
115 6 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
116 6 : });
117 LBC (2) : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
118 (2) : register_int_counter!(
119 (2) : "safekeeper_backup_errors_total",
120 (2) : "Number of errors during backup"
121 (2) : )
122 (2) : .expect("Failed to register safekeeper_backup_errors_total counter")
123 (2) : });
124 CBC 485 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
125 485 : register_histogram!(
126 485 : "safekeeper_broker_push_update_seconds",
127 485 : "Seconds to push all timeline updates to the broker",
128 485 : DISK_WRITE_SECONDS_BUCKETS.to_vec()
129 485 : )
130 485 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
131 485 : });
132 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
133 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
134 : ];
135 485 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
136 485 : register_histogram!(
137 485 : "safekeeper_broker_iteration_timelines",
138 485 : "Count of timelines pushed to the broker in a single iteration",
139 485 : TIMELINES_COUNT_BUCKETS.to_vec()
140 485 : )
141 485 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
142 485 : });
143 :
144 : pub const LABEL_UNKNOWN: &str = "unknown";
145 :
146 : /// Labels for traffic metrics.
147 UBC 0 : #[derive(Clone)]
148 : struct ConnectionLabels {
149 : /// Availability zone of the connection origin.
150 : client_az: String,
151 : /// Availability zone of the current safekeeper.
152 : sk_az: String,
153 : /// Client application name.
154 : app_name: String,
155 : }
156 :
157 : impl ConnectionLabels {
158 CBC 3123 : fn new() -> Self {
159 3123 : Self {
160 3123 : client_az: LABEL_UNKNOWN.to_string(),
161 3123 : sk_az: LABEL_UNKNOWN.to_string(),
162 3123 : app_name: LABEL_UNKNOWN.to_string(),
163 3123 : }
164 3123 : }
165 :
166 6971 : fn build_metrics(
167 6971 : &self,
168 6971 : ) -> (
169 6971 : GenericCounter<metrics::core::AtomicU64>,
170 6971 : GenericCounter<metrics::core::AtomicU64>,
171 6971 : ) {
172 6971 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
173 6971 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
174 10 : (client_az, sk_az) => {
175 10 : if client_az == sk_az {
176 UBC 0 : "true"
177 : } else {
178 CBC 10 : "false"
179 : }
180 : }
181 : };
182 :
183 6971 : let read = PG_IO_BYTES.with_label_values(&[
184 6971 : &self.client_az,
185 6971 : &self.sk_az,
186 6971 : &self.app_name,
187 6971 : "read",
188 6971 : same_az,
189 6971 : ]);
190 6971 : let write = PG_IO_BYTES.with_label_values(&[
191 6971 : &self.client_az,
192 6971 : &self.sk_az,
193 6971 : &self.app_name,
194 6971 : "write",
195 6971 : same_az,
196 6971 : ]);
197 6971 : (read, write)
198 6971 : }
199 : }
200 :
201 : struct TrafficMetricsState {
202 : /// Labels for traffic metrics.
203 : labels: ConnectionLabels,
204 : /// Total bytes read from this connection.
205 : read: GenericCounter<metrics::core::AtomicU64>,
206 : /// Total bytes written to this connection.
207 : write: GenericCounter<metrics::core::AtomicU64>,
208 : }
209 :
210 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
211 3123 : #[derive(Clone)]
212 : pub struct TrafficMetrics {
213 : state: Arc<RwLock<TrafficMetricsState>>,
214 : }
215 :
216 : impl Default for TrafficMetrics {
217 UBC 0 : fn default() -> Self {
218 0 : Self::new()
219 0 : }
220 : }
221 :
222 : impl TrafficMetrics {
223 CBC 3123 : pub fn new() -> Self {
224 3123 : let labels = ConnectionLabels::new();
225 3123 : let (read, write) = labels.build_metrics();
226 3123 : let state = TrafficMetricsState {
227 3123 : labels,
228 3123 : read,
229 3123 : write,
230 3123 : };
231 3123 : Self {
232 3123 : state: Arc::new(RwLock::new(state)),
233 3123 : }
234 3123 : }
235 :
236 5 : pub fn set_client_az(&self, value: &str) {
237 5 : let mut state = self.state.write().unwrap();
238 5 : state.labels.client_az = value.to_string();
239 5 : (state.read, state.write) = state.labels.build_metrics();
240 5 : }
241 :
242 3123 : pub fn set_sk_az(&self, value: &str) {
243 3123 : let mut state = self.state.write().unwrap();
244 3123 : state.labels.sk_az = value.to_string();
245 3123 : (state.read, state.write) = state.labels.build_metrics();
246 3123 : }
247 :
248 720 : pub fn set_app_name(&self, value: &str) {
249 720 : let mut state = self.state.write().unwrap();
250 720 : state.labels.app_name = value.to_string();
251 720 : (state.read, state.write) = state.labels.build_metrics();
252 720 : }
253 :
254 2177218 : pub fn observe_read(&self, cnt: usize) {
255 2177218 : self.state.read().unwrap().read.inc_by(cnt as u64)
256 2177218 : }
257 :
258 1883400 : pub fn observe_write(&self, cnt: usize) {
259 1883400 : self.state.read().unwrap().write.inc_by(cnt as u64)
260 1883400 : }
261 : }
262 :
263 : /// Metrics for WalStorage in a single timeline.
264 631 : #[derive(Clone, Default)]
265 : pub struct WalStorageMetrics {
266 : /// How much bytes were written in total.
267 : write_wal_bytes: u64,
268 : /// How much time spent writing WAL to disk, waiting for write(2).
269 : write_wal_seconds: f64,
270 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
271 : flush_wal_seconds: f64,
272 : }
273 :
274 : impl WalStorageMetrics {
275 1216618 : pub fn observe_write_bytes(&mut self, bytes: usize) {
276 1216618 : self.write_wal_bytes += bytes as u64;
277 1216618 : WRITE_WAL_BYTES.observe(bytes as f64);
278 1216618 : }
279 :
280 1216618 : pub fn observe_write_seconds(&mut self, seconds: f64) {
281 1216618 : self.write_wal_seconds += seconds;
282 1216618 : WRITE_WAL_SECONDS.observe(seconds);
283 1216618 : }
284 :
285 UBC 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
286 0 : self.flush_wal_seconds += seconds;
287 0 : FLUSH_WAL_SECONDS.observe(seconds);
288 0 : }
289 : }
290 :
291 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
292 CBC 1216619 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
293 1216619 : closure: impl Future<Output = Result<(), E>>,
294 1216619 : ) -> Result<f64> {
295 1216619 : let start = std::time::Instant::now();
296 2485345 : closure.await.map_err(|e| e.into())?;
297 1216618 : Ok(start.elapsed().as_secs_f64())
298 1216618 : }
299 :
300 : /// Metrics for a single timeline.
301 UBC 0 : #[derive(Clone)]
302 : pub struct FullTimelineInfo {
303 : pub ttid: TenantTimelineId,
304 : pub ps_feedback: PageserverFeedback,
305 : pub wal_backup_active: bool,
306 : pub timeline_is_active: bool,
307 : pub num_computes: u32,
308 : pub last_removed_segno: XLogSegNo,
309 :
310 : pub epoch_start_lsn: Lsn,
311 : pub mem_state: SafekeeperMemState,
312 : pub persisted_state: SafeKeeperState,
313 :
314 : pub flush_lsn: Lsn,
315 : pub remote_consistent_lsn: Lsn,
316 :
317 : pub wal_storage: WalStorageMetrics,
318 : }
319 :
320 : /// Collects metrics for all active timelines.
321 : pub struct TimelineCollector {
322 : descs: Vec<Desc>,
323 : commit_lsn: GenericGaugeVec<AtomicU64>,
324 : backup_lsn: GenericGaugeVec<AtomicU64>,
325 : flush_lsn: GenericGaugeVec<AtomicU64>,
326 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
327 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
328 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
329 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
330 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
331 : timeline_active: GenericGaugeVec<AtomicU64>,
332 : wal_backup_active: GenericGaugeVec<AtomicU64>,
333 : connected_computes: IntGaugeVec,
334 : disk_usage: GenericGaugeVec<AtomicU64>,
335 : acceptor_term: GenericGaugeVec<AtomicU64>,
336 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
337 : written_wal_seconds: GaugeVec,
338 : flushed_wal_seconds: GaugeVec,
339 : collect_timeline_metrics: Gauge,
340 : timelines_count: IntGauge,
341 : }
342 :
343 : impl Default for TimelineCollector {
344 0 : fn default() -> Self {
345 0 : Self::new()
346 0 : }
347 : }
348 :
349 : impl TimelineCollector {
350 CBC 485 : pub fn new() -> TimelineCollector {
351 485 : let mut descs = Vec::new();
352 485 :
353 485 : let commit_lsn = GenericGaugeVec::new(
354 485 : Opts::new(
355 485 : "safekeeper_commit_lsn",
356 485 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
357 485 : ),
358 485 : &["tenant_id", "timeline_id"],
359 485 : )
360 485 : .unwrap();
361 485 : descs.extend(commit_lsn.desc().into_iter().cloned());
362 485 :
363 485 : let backup_lsn = GenericGaugeVec::new(
364 485 : Opts::new(
365 485 : "safekeeper_backup_lsn",
366 485 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
367 485 : ),
368 485 : &["tenant_id", "timeline_id"],
369 485 : )
370 485 : .unwrap();
371 485 : descs.extend(backup_lsn.desc().into_iter().cloned());
372 485 :
373 485 : let flush_lsn = GenericGaugeVec::new(
374 485 : Opts::new(
375 485 : "safekeeper_flush_lsn",
376 485 : "Current flush_lsn, grouped by timeline",
377 485 : ),
378 485 : &["tenant_id", "timeline_id"],
379 485 : )
380 485 : .unwrap();
381 485 : descs.extend(flush_lsn.desc().into_iter().cloned());
382 485 :
383 485 : let epoch_start_lsn = GenericGaugeVec::new(
384 485 : Opts::new(
385 485 : "safekeeper_epoch_start_lsn",
386 485 : "Point since which compute generates new WAL in the current consensus term",
387 485 : ),
388 485 : &["tenant_id", "timeline_id"],
389 485 : )
390 485 : .unwrap();
391 485 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
392 485 :
393 485 : let peer_horizon_lsn = GenericGaugeVec::new(
394 485 : Opts::new(
395 485 : "safekeeper_peer_horizon_lsn",
396 485 : "LSN of the most lagging safekeeper",
397 485 : ),
398 485 : &["tenant_id", "timeline_id"],
399 485 : )
400 485 : .unwrap();
401 485 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
402 485 :
403 485 : let remote_consistent_lsn = GenericGaugeVec::new(
404 485 : Opts::new(
405 485 : "safekeeper_remote_consistent_lsn",
406 485 : "LSN which is persisted to the remote storage in pageserver",
407 485 : ),
408 485 : &["tenant_id", "timeline_id"],
409 485 : )
410 485 : .unwrap();
411 485 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
412 485 :
413 485 : let ps_last_received_lsn = GenericGaugeVec::new(
414 485 : Opts::new(
415 485 : "safekeeper_ps_last_received_lsn",
416 485 : "Last LSN received by the pageserver, acknowledged in the feedback",
417 485 : ),
418 485 : &["tenant_id", "timeline_id"],
419 485 : )
420 485 : .unwrap();
421 485 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
422 485 :
423 485 : let feedback_last_time_seconds = GenericGaugeVec::new(
424 485 : Opts::new(
425 485 : "safekeeper_feedback_last_time_seconds",
426 485 : "Timestamp of the last feedback from the pageserver",
427 485 : ),
428 485 : &["tenant_id", "timeline_id"],
429 485 : )
430 485 : .unwrap();
431 485 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
432 485 :
433 485 : let timeline_active = GenericGaugeVec::new(
434 485 : Opts::new(
435 485 : "safekeeper_timeline_active",
436 485 : "Reports 1 for active timelines, 0 for inactive",
437 485 : ),
438 485 : &["tenant_id", "timeline_id"],
439 485 : )
440 485 : .unwrap();
441 485 : descs.extend(timeline_active.desc().into_iter().cloned());
442 485 :
443 485 : let wal_backup_active = GenericGaugeVec::new(
444 485 : Opts::new(
445 485 : "safekeeper_wal_backup_active",
446 485 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
447 485 : ),
448 485 : &["tenant_id", "timeline_id"],
449 485 : )
450 485 : .unwrap();
451 485 : descs.extend(wal_backup_active.desc().into_iter().cloned());
452 485 :
453 485 : let connected_computes = IntGaugeVec::new(
454 485 : Opts::new(
455 485 : "safekeeper_connected_computes",
456 485 : "Number of active compute connections",
457 485 : ),
458 485 : &["tenant_id", "timeline_id"],
459 485 : )
460 485 : .unwrap();
461 485 : descs.extend(connected_computes.desc().into_iter().cloned());
462 485 :
463 485 : let disk_usage = GenericGaugeVec::new(
464 485 : Opts::new(
465 485 : "safekeeper_disk_usage_bytes",
466 485 : "Estimated disk space used to store WAL segments",
467 485 : ),
468 485 : &["tenant_id", "timeline_id"],
469 485 : )
470 485 : .unwrap();
471 485 : descs.extend(disk_usage.desc().into_iter().cloned());
472 485 :
473 485 : let acceptor_term = GenericGaugeVec::new(
474 485 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
475 485 : &["tenant_id", "timeline_id"],
476 485 : )
477 485 : .unwrap();
478 485 : descs.extend(acceptor_term.desc().into_iter().cloned());
479 485 :
480 485 : let written_wal_bytes = GenericGaugeVec::new(
481 485 : Opts::new(
482 485 : "safekeeper_written_wal_bytes_total",
483 485 : "Number of WAL bytes written to disk, grouped by timeline",
484 485 : ),
485 485 : &["tenant_id", "timeline_id"],
486 485 : )
487 485 : .unwrap();
488 485 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
489 485 :
490 485 : let written_wal_seconds = GaugeVec::new(
491 485 : Opts::new(
492 485 : "safekeeper_written_wal_seconds_total",
493 485 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
494 485 : ),
495 485 : &["tenant_id", "timeline_id"],
496 485 : )
497 485 : .unwrap();
498 485 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
499 485 :
500 485 : let flushed_wal_seconds = GaugeVec::new(
501 485 : Opts::new(
502 485 : "safekeeper_flushed_wal_seconds_total",
503 485 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
504 485 : ),
505 485 : &["tenant_id", "timeline_id"],
506 485 : )
507 485 : .unwrap();
508 485 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
509 485 :
510 485 : let collect_timeline_metrics = Gauge::new(
511 485 : "safekeeper_collect_timeline_metrics_seconds",
512 485 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
513 485 : )
514 485 : .unwrap();
515 485 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
516 485 :
517 485 : let timelines_count = IntGauge::new(
518 485 : "safekeeper_timelines",
519 485 : "Total number of timelines loaded in-memory",
520 485 : )
521 485 : .unwrap();
522 485 : descs.extend(timelines_count.desc().into_iter().cloned());
523 485 :
524 485 : TimelineCollector {
525 485 : descs,
526 485 : commit_lsn,
527 485 : backup_lsn,
528 485 : flush_lsn,
529 485 : epoch_start_lsn,
530 485 : peer_horizon_lsn,
531 485 : remote_consistent_lsn,
532 485 : ps_last_received_lsn,
533 485 : feedback_last_time_seconds,
534 485 : timeline_active,
535 485 : wal_backup_active,
536 485 : connected_computes,
537 485 : disk_usage,
538 485 : acceptor_term,
539 485 : written_wal_bytes,
540 485 : written_wal_seconds,
541 485 : flushed_wal_seconds,
542 485 : collect_timeline_metrics,
543 485 : timelines_count,
544 485 : }
545 485 : }
546 : }
547 :
548 : impl Collector for TimelineCollector {
549 485 : fn desc(&self) -> Vec<&Desc> {
550 485 : self.descs.iter().collect()
551 485 : }
552 :
553 19 : fn collect(&self) -> Vec<MetricFamily> {
554 19 : let start_collecting = Instant::now();
555 19 :
556 19 : // reset all metrics to clean up inactive timelines
557 19 : self.commit_lsn.reset();
558 19 : self.backup_lsn.reset();
559 19 : self.flush_lsn.reset();
560 19 : self.epoch_start_lsn.reset();
561 19 : self.peer_horizon_lsn.reset();
562 19 : self.remote_consistent_lsn.reset();
563 19 : self.ps_last_received_lsn.reset();
564 19 : self.feedback_last_time_seconds.reset();
565 19 : self.timeline_active.reset();
566 19 : self.wal_backup_active.reset();
567 19 : self.connected_computes.reset();
568 19 : self.disk_usage.reset();
569 19 : self.acceptor_term.reset();
570 19 : self.written_wal_bytes.reset();
571 19 : self.written_wal_seconds.reset();
572 19 : self.flushed_wal_seconds.reset();
573 19 :
574 19 : let timelines = GlobalTimelines::get_all();
575 19 : let timelines_count = timelines.len();
576 19 :
577 19 : // Prometheus Collector is sync, and data is stored under async lock. To
578 19 : // bridge the gap with a crutch, collect data in spawned thread with
579 19 : // local tokio runtime.
580 19 : let infos = std::thread::spawn(|| {
581 19 : let rt = tokio::runtime::Builder::new_current_thread()
582 19 : .build()
583 19 : .expect("failed to create rt");
584 19 : rt.block_on(collect_timeline_metrics())
585 19 : })
586 19 : .join()
587 19 : .expect("collect_timeline_metrics thread panicked");
588 :
589 70 : for tli in &infos {
590 51 : let tenant_id = tli.ttid.tenant_id.to_string();
591 51 : let timeline_id = tli.ttid.timeline_id.to_string();
592 51 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
593 51 :
594 51 : self.commit_lsn
595 51 : .with_label_values(labels)
596 51 : .set(tli.mem_state.commit_lsn.into());
597 51 : self.backup_lsn
598 51 : .with_label_values(labels)
599 51 : .set(tli.mem_state.backup_lsn.into());
600 51 : self.flush_lsn
601 51 : .with_label_values(labels)
602 51 : .set(tli.flush_lsn.into());
603 51 : self.epoch_start_lsn
604 51 : .with_label_values(labels)
605 51 : .set(tli.epoch_start_lsn.into());
606 51 : self.peer_horizon_lsn
607 51 : .with_label_values(labels)
608 51 : .set(tli.mem_state.peer_horizon_lsn.into());
609 51 : self.remote_consistent_lsn
610 51 : .with_label_values(labels)
611 51 : .set(tli.remote_consistent_lsn.into());
612 51 : self.timeline_active
613 51 : .with_label_values(labels)
614 51 : .set(tli.timeline_is_active as u64);
615 51 : self.wal_backup_active
616 51 : .with_label_values(labels)
617 51 : .set(tli.wal_backup_active as u64);
618 51 : self.connected_computes
619 51 : .with_label_values(labels)
620 51 : .set(tli.num_computes as i64);
621 51 : self.acceptor_term
622 51 : .with_label_values(labels)
623 51 : .set(tli.persisted_state.acceptor_state.term);
624 51 : self.written_wal_bytes
625 51 : .with_label_values(labels)
626 51 : .set(tli.wal_storage.write_wal_bytes);
627 51 : self.written_wal_seconds
628 51 : .with_label_values(labels)
629 51 : .set(tli.wal_storage.write_wal_seconds);
630 51 : self.flushed_wal_seconds
631 51 : .with_label_values(labels)
632 51 : .set(tli.wal_storage.flush_wal_seconds);
633 51 :
634 51 : self.ps_last_received_lsn
635 51 : .with_label_values(labels)
636 51 : .set(tli.ps_feedback.last_received_lsn.0);
637 51 : if let Ok(unix_time) = tli
638 51 : .ps_feedback
639 51 : .replytime
640 51 : .duration_since(SystemTime::UNIX_EPOCH)
641 51 : {
642 51 : self.feedback_last_time_seconds
643 51 : .with_label_values(labels)
644 51 : .set(unix_time.as_secs());
645 51 : }
646 :
647 51 : if tli.last_removed_segno != 0 {
648 UBC 0 : let segno_count = tli
649 0 : .flush_lsn
650 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
651 0 : - tli.last_removed_segno;
652 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
653 0 : self.disk_usage
654 0 : .with_label_values(labels)
655 0 : .set(disk_usage_bytes);
656 CBC 51 : }
657 : }
658 :
659 : // collect MetricFamilys.
660 19 : let mut mfs = Vec::new();
661 19 : mfs.extend(self.commit_lsn.collect());
662 19 : mfs.extend(self.backup_lsn.collect());
663 19 : mfs.extend(self.flush_lsn.collect());
664 19 : mfs.extend(self.epoch_start_lsn.collect());
665 19 : mfs.extend(self.peer_horizon_lsn.collect());
666 19 : mfs.extend(self.remote_consistent_lsn.collect());
667 19 : mfs.extend(self.ps_last_received_lsn.collect());
668 19 : mfs.extend(self.feedback_last_time_seconds.collect());
669 19 : mfs.extend(self.timeline_active.collect());
670 19 : mfs.extend(self.wal_backup_active.collect());
671 19 : mfs.extend(self.connected_computes.collect());
672 19 : mfs.extend(self.disk_usage.collect());
673 19 : mfs.extend(self.acceptor_term.collect());
674 19 : mfs.extend(self.written_wal_bytes.collect());
675 19 : mfs.extend(self.written_wal_seconds.collect());
676 19 : mfs.extend(self.flushed_wal_seconds.collect());
677 19 :
678 19 : // report time it took to collect all info
679 19 : let elapsed = start_collecting.elapsed().as_secs_f64();
680 19 : self.collect_timeline_metrics.set(elapsed);
681 19 : mfs.extend(self.collect_timeline_metrics.collect());
682 19 :
683 19 : // report total number of timelines
684 19 : self.timelines_count.set(timelines_count as i64);
685 19 : mfs.extend(self.timelines_count.collect());
686 19 :
687 19 : mfs
688 19 : }
689 : }
690 :
691 19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
692 19 : let mut res = vec![];
693 19 : let timelines = GlobalTimelines::get_all();
694 :
695 70 : for tli in timelines {
696 51 : if let Some(info) = tli.info_for_metrics().await {
697 51 : res.push(info);
698 51 : }
699 : }
700 19 : res
701 19 : }
|