Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use anyhow::Result;
9 : use futures::Future;
10 : use metrics::{
11 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
12 : pow2_buckets,
13 : proto::MetricFamily,
14 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
15 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, GaugeVec,
16 : Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
17 : IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS,
18 : };
19 : use once_cell::sync::Lazy;
20 : use postgres_ffi::XLogSegNo;
21 : use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback};
22 :
23 : use crate::{
24 : receive_wal::MSG_QUEUE_SIZE,
25 : state::{TimelineMemState, TimelinePersistentState},
26 : GlobalTimelines,
27 : };
28 :
29 : // Global metrics across all timelines.
30 0 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
31 0 : register_histogram!(
32 0 : "safekeeper_write_wal_bytes",
33 0 : "Bytes written to WAL in a single request",
34 0 : vec![
35 0 : 1.0,
36 0 : 10.0,
37 0 : 100.0,
38 0 : 1024.0,
39 0 : 8192.0,
40 0 : 128.0 * 1024.0,
41 0 : 1024.0 * 1024.0,
42 0 : 10.0 * 1024.0 * 1024.0
43 0 : ]
44 0 : )
45 0 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
46 0 : });
47 0 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
48 0 : register_histogram!(
49 0 : "safekeeper_write_wal_seconds",
50 0 : "Seconds spent writing and syncing WAL to a disk in a single request",
51 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
52 0 : )
53 0 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
54 0 : });
55 0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
56 0 : register_histogram!(
57 0 : "safekeeper_flush_wal_seconds",
58 0 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
59 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
60 0 : )
61 0 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
62 0 : });
63 2 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
64 2 : register_histogram!(
65 2 : "safekeeper_persist_control_file_seconds",
66 2 : "Seconds to persist and sync control file",
67 2 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
68 2 : )
69 2 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
70 2 : });
71 0 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
72 0 : register_histogram_vec!(
73 0 : "safekeeper_wal_storage_operation_seconds",
74 0 : "Seconds spent on WAL storage operations",
75 0 : &["operation"],
76 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
77 0 : )
78 0 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
79 0 : });
80 10 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
81 10 : register_histogram_vec!(
82 10 : "safekeeper_misc_operation_seconds",
83 10 : "Seconds spent on miscellaneous operations",
84 10 : &["operation"],
85 10 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
86 10 : )
87 10 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
88 10 : });
89 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
90 0 : register_int_counter_vec!(
91 0 : "safekeeper_pg_io_bytes_total",
92 0 : "Bytes read from or written to any PostgreSQL connection",
93 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
94 0 : )
95 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
96 0 : });
97 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
98 0 : register_int_counter!(
99 0 : "safekeeper_broker_pushed_updates_total",
100 0 : "Number of timeline updates pushed to the broker"
101 0 : )
102 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
103 0 : });
104 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
105 0 : register_int_counter_vec!(
106 0 : "safekeeper_broker_pulled_updates_total",
107 0 : "Number of timeline updates pulled and processed from the broker",
108 0 : &["result"]
109 0 : )
110 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
111 0 : });
112 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
113 0 : register_int_counter_pair_vec!(
114 0 : "safekeeper_pg_queries_received_total",
115 0 : "Number of queries received through pg protocol",
116 0 : "safekeeper_pg_queries_finished_total",
117 0 : "Number of queries finished through pg protocol",
118 0 : &["query"]
119 0 : )
120 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
121 0 : });
122 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
123 0 : register_int_counter!(
124 0 : "safekeeper_removed_wal_segments_total",
125 0 : "Number of WAL segments removed from the disk"
126 0 : )
127 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
128 0 : });
129 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
130 0 : register_int_counter!(
131 0 : "safekeeper_backed_up_segments_total",
132 0 : "Number of WAL segments backed up to the S3"
133 0 : )
134 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
135 0 : });
136 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
137 0 : register_int_counter!(
138 0 : "safekeeper_backup_errors_total",
139 0 : "Number of errors during backup"
140 0 : )
141 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
142 0 : });
143 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
144 0 : register_histogram!(
145 0 : "safekeeper_broker_push_update_seconds",
146 0 : "Seconds to push all timeline updates to the broker",
147 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
148 0 : )
149 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
150 0 : });
151 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
152 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
153 : ];
154 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
155 0 : register_histogram!(
156 0 : "safekeeper_broker_iteration_timelines",
157 0 : "Count of timelines pushed to the broker in a single iteration",
158 0 : TIMELINES_COUNT_BUCKETS.to_vec()
159 0 : )
160 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
161 0 : });
162 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
163 0 : register_int_counter!(
164 0 : "safekeeper_received_ps_feedbacks_total",
165 0 : "Number of pageserver feedbacks received"
166 0 : )
167 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
168 0 : });
169 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
170 0 : register_int_counter_vec!(
171 0 : "safekeeper_partial_backup_uploads_total",
172 0 : "Number of partial backup uploads to the S3",
173 0 : &["result"]
174 0 : )
175 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
176 0 : });
177 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
178 0 : register_int_counter!(
179 0 : "safekeeper_partial_backup_uploaded_bytes_total",
180 0 : "Number of bytes uploaded to the S3 during partial backup"
181 0 : )
182 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
183 0 : });
184 0 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
185 0 : register_int_counter!(
186 0 : "safekeeper_manager_iterations_total",
187 0 : "Number of iterations of the timeline manager task"
188 0 : )
189 0 : .expect("Failed to register safekeeper_manager_iterations_total counter")
190 0 : });
191 0 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
192 0 : register_int_counter!(
193 0 : "safekeeper_manager_active_changes_total",
194 0 : "Number of timeline active status changes in the timeline manager task"
195 0 : )
196 0 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
197 0 : });
198 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
199 0 : register_int_counter_pair!(
200 0 : "safekeeper_wal_backup_tasks_started_total",
201 0 : "Number of active WAL backup tasks",
202 0 : "safekeeper_wal_backup_tasks_finished_total",
203 0 : "Number of finished WAL backup tasks",
204 0 : )
205 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
206 0 : });
207 0 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
208 0 : register_int_gauge!(
209 0 : "safekeeper_wal_receivers",
210 0 : "Number of currently connected WAL receivers (i.e. connected computes)"
211 0 : )
212 0 : .expect("Failed to register safekeeper_wal_receivers")
213 0 : });
214 0 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
215 0 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
216 0 : // full queues respectively.
217 0 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
218 0 : buckets.insert(0, 0.0);
219 0 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
220 0 : assert!(buckets.len() <= 12, "too many histogram buckets");
221 :
222 0 : register_histogram!(
223 0 : "safekeeper_wal_receiver_queue_depth",
224 0 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
225 0 : buckets
226 0 : )
227 0 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
228 0 : });
229 0 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
230 0 : register_int_gauge!(
231 0 : "safekeeper_wal_receiver_queue_depth_total",
232 0 : "Total number of queued messages across all WAL receivers",
233 0 : )
234 0 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
235 0 : });
236 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
237 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
238 0 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
239 0 : register_int_gauge!(
240 0 : "safekeeper_wal_receiver_queue_size_total",
241 0 : "Total memory byte size of queued messages across all WAL receivers",
242 0 : )
243 0 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
244 0 : });
245 :
246 : // Metrics collected on operations on the storage repository.
247 0 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
248 : #[strum(serialize_all = "kebab_case")]
249 : pub(crate) enum EvictionEvent {
250 : Evict,
251 : Restore,
252 : }
253 :
254 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
255 0 : register_int_counter_vec!(
256 0 : "safekeeper_eviction_events_started_total",
257 0 : "Number of eviction state changes, incremented when they start",
258 0 : &["kind"]
259 0 : )
260 0 : .expect("Failed to register metric")
261 0 : });
262 :
263 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
264 0 : register_int_counter_vec!(
265 0 : "safekeeper_eviction_events_completed_total",
266 0 : "Number of eviction state changes, incremented when they complete",
267 0 : &["kind"]
268 0 : )
269 0 : .expect("Failed to register metric")
270 0 : });
271 :
272 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
273 0 : register_int_gauge!(
274 0 : "safekeeper_evicted_timelines",
275 0 : "Number of currently evicted timelines"
276 0 : )
277 0 : .expect("Failed to register metric")
278 0 : });
279 :
280 : pub const LABEL_UNKNOWN: &str = "unknown";
281 :
282 : /// Labels for traffic metrics.
283 : #[derive(Clone)]
284 : struct ConnectionLabels {
285 : /// Availability zone of the connection origin.
286 : client_az: String,
287 : /// Availability zone of the current safekeeper.
288 : sk_az: String,
289 : /// Client application name.
290 : app_name: String,
291 : }
292 :
293 : impl ConnectionLabels {
294 0 : fn new() -> Self {
295 0 : Self {
296 0 : client_az: LABEL_UNKNOWN.to_string(),
297 0 : sk_az: LABEL_UNKNOWN.to_string(),
298 0 : app_name: LABEL_UNKNOWN.to_string(),
299 0 : }
300 0 : }
301 :
302 0 : fn build_metrics(
303 0 : &self,
304 0 : ) -> (
305 0 : GenericCounter<metrics::core::AtomicU64>,
306 0 : GenericCounter<metrics::core::AtomicU64>,
307 0 : ) {
308 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
309 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
310 0 : (client_az, sk_az) => {
311 0 : if client_az == sk_az {
312 0 : "true"
313 : } else {
314 0 : "false"
315 : }
316 : }
317 : };
318 :
319 0 : let read = PG_IO_BYTES.with_label_values(&[
320 0 : &self.client_az,
321 0 : &self.sk_az,
322 0 : &self.app_name,
323 0 : "read",
324 0 : same_az,
325 0 : ]);
326 0 : let write = PG_IO_BYTES.with_label_values(&[
327 0 : &self.client_az,
328 0 : &self.sk_az,
329 0 : &self.app_name,
330 0 : "write",
331 0 : same_az,
332 0 : ]);
333 0 : (read, write)
334 0 : }
335 : }
336 :
337 : struct TrafficMetricsState {
338 : /// Labels for traffic metrics.
339 : labels: ConnectionLabels,
340 : /// Total bytes read from this connection.
341 : read: GenericCounter<metrics::core::AtomicU64>,
342 : /// Total bytes written to this connection.
343 : write: GenericCounter<metrics::core::AtomicU64>,
344 : }
345 :
346 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
347 : #[derive(Clone)]
348 : pub struct TrafficMetrics {
349 : state: Arc<RwLock<TrafficMetricsState>>,
350 : }
351 :
352 : impl Default for TrafficMetrics {
353 0 : fn default() -> Self {
354 0 : Self::new()
355 0 : }
356 : }
357 :
358 : impl TrafficMetrics {
359 0 : pub fn new() -> Self {
360 0 : let labels = ConnectionLabels::new();
361 0 : let (read, write) = labels.build_metrics();
362 0 : let state = TrafficMetricsState {
363 0 : labels,
364 0 : read,
365 0 : write,
366 0 : };
367 0 : Self {
368 0 : state: Arc::new(RwLock::new(state)),
369 0 : }
370 0 : }
371 :
372 0 : pub fn set_client_az(&self, value: &str) {
373 0 : let mut state = self.state.write().unwrap();
374 0 : state.labels.client_az = value.to_string();
375 0 : (state.read, state.write) = state.labels.build_metrics();
376 0 : }
377 :
378 0 : pub fn set_sk_az(&self, value: &str) {
379 0 : let mut state = self.state.write().unwrap();
380 0 : state.labels.sk_az = value.to_string();
381 0 : (state.read, state.write) = state.labels.build_metrics();
382 0 : }
383 :
384 0 : pub fn set_app_name(&self, value: &str) {
385 0 : let mut state = self.state.write().unwrap();
386 0 : state.labels.app_name = value.to_string();
387 0 : (state.read, state.write) = state.labels.build_metrics();
388 0 : }
389 :
390 0 : pub fn observe_read(&self, cnt: usize) {
391 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
392 0 : }
393 :
394 0 : pub fn observe_write(&self, cnt: usize) {
395 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
396 0 : }
397 : }
398 :
399 : /// Metrics for WalStorage in a single timeline.
400 : #[derive(Clone, Default)]
401 : pub struct WalStorageMetrics {
402 : /// How much bytes were written in total.
403 : write_wal_bytes: u64,
404 : /// How much time spent writing WAL to disk, waiting for write(2).
405 : write_wal_seconds: f64,
406 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
407 : flush_wal_seconds: f64,
408 : }
409 :
410 : impl WalStorageMetrics {
411 0 : pub fn observe_write_bytes(&mut self, bytes: usize) {
412 0 : self.write_wal_bytes += bytes as u64;
413 0 : WRITE_WAL_BYTES.observe(bytes as f64);
414 0 : }
415 :
416 0 : pub fn observe_write_seconds(&mut self, seconds: f64) {
417 0 : self.write_wal_seconds += seconds;
418 0 : WRITE_WAL_SECONDS.observe(seconds);
419 0 : }
420 :
421 0 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
422 0 : self.flush_wal_seconds += seconds;
423 0 : FLUSH_WAL_SECONDS.observe(seconds);
424 0 : }
425 : }
426 :
427 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
428 0 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
429 0 : closure: impl Future<Output = Result<(), E>>,
430 0 : ) -> Result<f64> {
431 0 : let start = std::time::Instant::now();
432 0 : closure.await.map_err(|e| e.into())?;
433 0 : Ok(start.elapsed().as_secs_f64())
434 0 : }
435 :
436 : /// Metrics for a single timeline.
437 : #[derive(Clone)]
438 : pub struct FullTimelineInfo {
439 : pub ttid: TenantTimelineId,
440 : pub ps_feedback_count: u64,
441 : pub last_ps_feedback: PageserverFeedback,
442 : pub wal_backup_active: bool,
443 : pub timeline_is_active: bool,
444 : pub num_computes: u32,
445 : pub last_removed_segno: XLogSegNo,
446 :
447 : pub epoch_start_lsn: Lsn,
448 : pub mem_state: TimelineMemState,
449 : pub persisted_state: TimelinePersistentState,
450 :
451 : pub flush_lsn: Lsn,
452 :
453 : pub wal_storage: WalStorageMetrics,
454 : }
455 :
456 : /// Collects metrics for all active timelines.
457 : pub struct TimelineCollector {
458 : descs: Vec<Desc>,
459 : commit_lsn: GenericGaugeVec<AtomicU64>,
460 : backup_lsn: GenericGaugeVec<AtomicU64>,
461 : flush_lsn: GenericGaugeVec<AtomicU64>,
462 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
463 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
464 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
465 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
466 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
467 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
468 : timeline_active: GenericGaugeVec<AtomicU64>,
469 : wal_backup_active: GenericGaugeVec<AtomicU64>,
470 : connected_computes: IntGaugeVec,
471 : disk_usage: GenericGaugeVec<AtomicU64>,
472 : acceptor_term: GenericGaugeVec<AtomicU64>,
473 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
474 : written_wal_seconds: GaugeVec,
475 : flushed_wal_seconds: GaugeVec,
476 : collect_timeline_metrics: Gauge,
477 : timelines_count: IntGauge,
478 : active_timelines_count: IntGauge,
479 : }
480 :
481 : impl Default for TimelineCollector {
482 0 : fn default() -> Self {
483 0 : Self::new()
484 0 : }
485 : }
486 :
487 : impl TimelineCollector {
488 0 : pub fn new() -> TimelineCollector {
489 0 : let mut descs = Vec::new();
490 0 :
491 0 : let commit_lsn = GenericGaugeVec::new(
492 0 : Opts::new(
493 0 : "safekeeper_commit_lsn",
494 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
495 0 : ),
496 0 : &["tenant_id", "timeline_id"],
497 0 : )
498 0 : .unwrap();
499 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
500 0 :
501 0 : let backup_lsn = GenericGaugeVec::new(
502 0 : Opts::new(
503 0 : "safekeeper_backup_lsn",
504 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
505 0 : ),
506 0 : &["tenant_id", "timeline_id"],
507 0 : )
508 0 : .unwrap();
509 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
510 0 :
511 0 : let flush_lsn = GenericGaugeVec::new(
512 0 : Opts::new(
513 0 : "safekeeper_flush_lsn",
514 0 : "Current flush_lsn, grouped by timeline",
515 0 : ),
516 0 : &["tenant_id", "timeline_id"],
517 0 : )
518 0 : .unwrap();
519 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
520 0 :
521 0 : let epoch_start_lsn = GenericGaugeVec::new(
522 0 : Opts::new(
523 0 : "safekeeper_epoch_start_lsn",
524 0 : "Point since which compute generates new WAL in the current consensus term",
525 0 : ),
526 0 : &["tenant_id", "timeline_id"],
527 0 : )
528 0 : .unwrap();
529 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
530 0 :
531 0 : let peer_horizon_lsn = GenericGaugeVec::new(
532 0 : Opts::new(
533 0 : "safekeeper_peer_horizon_lsn",
534 0 : "LSN of the most lagging safekeeper",
535 0 : ),
536 0 : &["tenant_id", "timeline_id"],
537 0 : )
538 0 : .unwrap();
539 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
540 0 :
541 0 : let remote_consistent_lsn = GenericGaugeVec::new(
542 0 : Opts::new(
543 0 : "safekeeper_remote_consistent_lsn",
544 0 : "LSN which is persisted to the remote storage in pageserver",
545 0 : ),
546 0 : &["tenant_id", "timeline_id"],
547 0 : )
548 0 : .unwrap();
549 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
550 0 :
551 0 : let ps_last_received_lsn = GenericGaugeVec::new(
552 0 : Opts::new(
553 0 : "safekeeper_ps_last_received_lsn",
554 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
555 0 : ),
556 0 : &["tenant_id", "timeline_id"],
557 0 : )
558 0 : .unwrap();
559 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
560 0 :
561 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
562 0 : Opts::new(
563 0 : "safekeeper_feedback_last_time_seconds",
564 0 : "Timestamp of the last feedback from the pageserver",
565 0 : ),
566 0 : &["tenant_id", "timeline_id"],
567 0 : )
568 0 : .unwrap();
569 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
570 0 :
571 0 : let ps_feedback_count = GenericGaugeVec::new(
572 0 : Opts::new(
573 0 : "safekeeper_ps_feedback_count_total",
574 0 : "Number of feedbacks received from the pageserver",
575 0 : ),
576 0 : &["tenant_id", "timeline_id"],
577 0 : )
578 0 : .unwrap();
579 0 :
580 0 : let timeline_active = GenericGaugeVec::new(
581 0 : Opts::new(
582 0 : "safekeeper_timeline_active",
583 0 : "Reports 1 for active timelines, 0 for inactive",
584 0 : ),
585 0 : &["tenant_id", "timeline_id"],
586 0 : )
587 0 : .unwrap();
588 0 : descs.extend(timeline_active.desc().into_iter().cloned());
589 0 :
590 0 : let wal_backup_active = GenericGaugeVec::new(
591 0 : Opts::new(
592 0 : "safekeeper_wal_backup_active",
593 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
594 0 : ),
595 0 : &["tenant_id", "timeline_id"],
596 0 : )
597 0 : .unwrap();
598 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
599 0 :
600 0 : let connected_computes = IntGaugeVec::new(
601 0 : Opts::new(
602 0 : "safekeeper_connected_computes",
603 0 : "Number of active compute connections",
604 0 : ),
605 0 : &["tenant_id", "timeline_id"],
606 0 : )
607 0 : .unwrap();
608 0 : descs.extend(connected_computes.desc().into_iter().cloned());
609 0 :
610 0 : let disk_usage = GenericGaugeVec::new(
611 0 : Opts::new(
612 0 : "safekeeper_disk_usage_bytes",
613 0 : "Estimated disk space used to store WAL segments",
614 0 : ),
615 0 : &["tenant_id", "timeline_id"],
616 0 : )
617 0 : .unwrap();
618 0 : descs.extend(disk_usage.desc().into_iter().cloned());
619 0 :
620 0 : let acceptor_term = GenericGaugeVec::new(
621 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
622 0 : &["tenant_id", "timeline_id"],
623 0 : )
624 0 : .unwrap();
625 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
626 0 :
627 0 : let written_wal_bytes = GenericGaugeVec::new(
628 0 : Opts::new(
629 0 : "safekeeper_written_wal_bytes_total",
630 0 : "Number of WAL bytes written to disk, grouped by timeline",
631 0 : ),
632 0 : &["tenant_id", "timeline_id"],
633 0 : )
634 0 : .unwrap();
635 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
636 0 :
637 0 : let written_wal_seconds = GaugeVec::new(
638 0 : Opts::new(
639 0 : "safekeeper_written_wal_seconds_total",
640 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
641 0 : ),
642 0 : &["tenant_id", "timeline_id"],
643 0 : )
644 0 : .unwrap();
645 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
646 0 :
647 0 : let flushed_wal_seconds = GaugeVec::new(
648 0 : Opts::new(
649 0 : "safekeeper_flushed_wal_seconds_total",
650 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
651 0 : ),
652 0 : &["tenant_id", "timeline_id"],
653 0 : )
654 0 : .unwrap();
655 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
656 0 :
657 0 : let collect_timeline_metrics = Gauge::new(
658 0 : "safekeeper_collect_timeline_metrics_seconds",
659 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
660 0 : )
661 0 : .unwrap();
662 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
663 0 :
664 0 : let timelines_count = IntGauge::new(
665 0 : "safekeeper_timelines",
666 0 : "Total number of timelines loaded in-memory",
667 0 : )
668 0 : .unwrap();
669 0 : descs.extend(timelines_count.desc().into_iter().cloned());
670 0 :
671 0 : let active_timelines_count = IntGauge::new(
672 0 : "safekeeper_active_timelines",
673 0 : "Total number of active timelines",
674 0 : )
675 0 : .unwrap();
676 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
677 0 :
678 0 : TimelineCollector {
679 0 : descs,
680 0 : commit_lsn,
681 0 : backup_lsn,
682 0 : flush_lsn,
683 0 : epoch_start_lsn,
684 0 : peer_horizon_lsn,
685 0 : remote_consistent_lsn,
686 0 : ps_last_received_lsn,
687 0 : feedback_last_time_seconds,
688 0 : ps_feedback_count,
689 0 : timeline_active,
690 0 : wal_backup_active,
691 0 : connected_computes,
692 0 : disk_usage,
693 0 : acceptor_term,
694 0 : written_wal_bytes,
695 0 : written_wal_seconds,
696 0 : flushed_wal_seconds,
697 0 : collect_timeline_metrics,
698 0 : timelines_count,
699 0 : active_timelines_count,
700 0 : }
701 0 : }
702 : }
703 :
704 : impl Collector for TimelineCollector {
705 0 : fn desc(&self) -> Vec<&Desc> {
706 0 : self.descs.iter().collect()
707 0 : }
708 :
709 0 : fn collect(&self) -> Vec<MetricFamily> {
710 0 : let start_collecting = Instant::now();
711 0 :
712 0 : // reset all metrics to clean up inactive timelines
713 0 : self.commit_lsn.reset();
714 0 : self.backup_lsn.reset();
715 0 : self.flush_lsn.reset();
716 0 : self.epoch_start_lsn.reset();
717 0 : self.peer_horizon_lsn.reset();
718 0 : self.remote_consistent_lsn.reset();
719 0 : self.ps_last_received_lsn.reset();
720 0 : self.feedback_last_time_seconds.reset();
721 0 : self.ps_feedback_count.reset();
722 0 : self.timeline_active.reset();
723 0 : self.wal_backup_active.reset();
724 0 : self.connected_computes.reset();
725 0 : self.disk_usage.reset();
726 0 : self.acceptor_term.reset();
727 0 : self.written_wal_bytes.reset();
728 0 : self.written_wal_seconds.reset();
729 0 : self.flushed_wal_seconds.reset();
730 0 :
731 0 : let timelines_count = GlobalTimelines::get_all().len();
732 0 : let mut active_timelines_count = 0;
733 0 :
734 0 : // Prometheus Collector is sync, and data is stored under async lock. To
735 0 : // bridge the gap with a crutch, collect data in spawned thread with
736 0 : // local tokio runtime.
737 0 : let infos = std::thread::spawn(|| {
738 0 : let rt = tokio::runtime::Builder::new_current_thread()
739 0 : .build()
740 0 : .expect("failed to create rt");
741 0 : rt.block_on(collect_timeline_metrics())
742 0 : })
743 0 : .join()
744 0 : .expect("collect_timeline_metrics thread panicked");
745 :
746 0 : for tli in &infos {
747 0 : let tenant_id = tli.ttid.tenant_id.to_string();
748 0 : let timeline_id = tli.ttid.timeline_id.to_string();
749 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
750 0 :
751 0 : if tli.timeline_is_active {
752 0 : active_timelines_count += 1;
753 0 : }
754 :
755 0 : self.commit_lsn
756 0 : .with_label_values(labels)
757 0 : .set(tli.mem_state.commit_lsn.into());
758 0 : self.backup_lsn
759 0 : .with_label_values(labels)
760 0 : .set(tli.mem_state.backup_lsn.into());
761 0 : self.flush_lsn
762 0 : .with_label_values(labels)
763 0 : .set(tli.flush_lsn.into());
764 0 : self.epoch_start_lsn
765 0 : .with_label_values(labels)
766 0 : .set(tli.epoch_start_lsn.into());
767 0 : self.peer_horizon_lsn
768 0 : .with_label_values(labels)
769 0 : .set(tli.mem_state.peer_horizon_lsn.into());
770 0 : self.remote_consistent_lsn
771 0 : .with_label_values(labels)
772 0 : .set(tli.mem_state.remote_consistent_lsn.into());
773 0 : self.timeline_active
774 0 : .with_label_values(labels)
775 0 : .set(tli.timeline_is_active as u64);
776 0 : self.wal_backup_active
777 0 : .with_label_values(labels)
778 0 : .set(tli.wal_backup_active as u64);
779 0 : self.connected_computes
780 0 : .with_label_values(labels)
781 0 : .set(tli.num_computes as i64);
782 0 : self.acceptor_term
783 0 : .with_label_values(labels)
784 0 : .set(tli.persisted_state.acceptor_state.term);
785 0 : self.written_wal_bytes
786 0 : .with_label_values(labels)
787 0 : .set(tli.wal_storage.write_wal_bytes);
788 0 : self.written_wal_seconds
789 0 : .with_label_values(labels)
790 0 : .set(tli.wal_storage.write_wal_seconds);
791 0 : self.flushed_wal_seconds
792 0 : .with_label_values(labels)
793 0 : .set(tli.wal_storage.flush_wal_seconds);
794 0 :
795 0 : self.ps_last_received_lsn
796 0 : .with_label_values(labels)
797 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
798 0 : self.ps_feedback_count
799 0 : .with_label_values(labels)
800 0 : .set(tli.ps_feedback_count);
801 0 : if let Ok(unix_time) = tli
802 0 : .last_ps_feedback
803 0 : .replytime
804 0 : .duration_since(SystemTime::UNIX_EPOCH)
805 0 : {
806 0 : self.feedback_last_time_seconds
807 0 : .with_label_values(labels)
808 0 : .set(unix_time.as_secs());
809 0 : }
810 :
811 0 : if tli.last_removed_segno != 0 {
812 0 : let segno_count = tli
813 0 : .flush_lsn
814 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
815 0 : - tli.last_removed_segno;
816 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
817 0 : self.disk_usage
818 0 : .with_label_values(labels)
819 0 : .set(disk_usage_bytes);
820 0 : }
821 : }
822 :
823 : // collect MetricFamilys.
824 0 : let mut mfs = Vec::new();
825 0 : mfs.extend(self.commit_lsn.collect());
826 0 : mfs.extend(self.backup_lsn.collect());
827 0 : mfs.extend(self.flush_lsn.collect());
828 0 : mfs.extend(self.epoch_start_lsn.collect());
829 0 : mfs.extend(self.peer_horizon_lsn.collect());
830 0 : mfs.extend(self.remote_consistent_lsn.collect());
831 0 : mfs.extend(self.ps_last_received_lsn.collect());
832 0 : mfs.extend(self.feedback_last_time_seconds.collect());
833 0 : mfs.extend(self.ps_feedback_count.collect());
834 0 : mfs.extend(self.timeline_active.collect());
835 0 : mfs.extend(self.wal_backup_active.collect());
836 0 : mfs.extend(self.connected_computes.collect());
837 0 : mfs.extend(self.disk_usage.collect());
838 0 : mfs.extend(self.acceptor_term.collect());
839 0 : mfs.extend(self.written_wal_bytes.collect());
840 0 : mfs.extend(self.written_wal_seconds.collect());
841 0 : mfs.extend(self.flushed_wal_seconds.collect());
842 0 :
843 0 : // report time it took to collect all info
844 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
845 0 : self.collect_timeline_metrics.set(elapsed);
846 0 : mfs.extend(self.collect_timeline_metrics.collect());
847 0 :
848 0 : // report total number of timelines
849 0 : self.timelines_count.set(timelines_count as i64);
850 0 : mfs.extend(self.timelines_count.collect());
851 0 :
852 0 : self.active_timelines_count
853 0 : .set(active_timelines_count as i64);
854 0 : mfs.extend(self.active_timelines_count.collect());
855 0 :
856 0 : mfs
857 0 : }
858 : }
859 :
860 0 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
861 0 : let mut res = vec![];
862 0 : let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
863 :
864 0 : for tli in active_timelines {
865 0 : if let Some(info) = tli.info_for_metrics().await {
866 0 : res.push(info);
867 0 : }
868 : }
869 0 : res
870 0 : }
|