Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::sync::{Arc, RwLock};
4 : use std::time::{Instant, SystemTime};
5 :
6 : use anyhow::Result;
7 : use futures::Future;
8 : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
9 : use metrics::proto::MetricFamily;
10 : use metrics::{
11 : DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
12 : IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
13 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
14 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
15 : register_int_gauge_vec,
16 : };
17 : use once_cell::sync::Lazy;
18 : use postgres_ffi::XLogSegNo;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 :
23 : use crate::GlobalTimelines;
24 : use crate::receive_wal::MSG_QUEUE_SIZE;
25 : use crate::state::{TimelineMemState, TimelinePersistentState};
26 :
27 : // Global metrics across all timelines.
28 5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 5 : register_histogram!(
30 : "safekeeper_write_wal_bytes",
31 : "Bytes written to WAL in a single request",
32 5 : vec![
33 : 1.0,
34 : 10.0,
35 : 100.0,
36 : 1024.0,
37 : 8192.0,
38 5 : 128.0 * 1024.0,
39 5 : 1024.0 * 1024.0,
40 5 : 10.0 * 1024.0 * 1024.0
41 : ]
42 : )
43 5 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 5 : });
45 5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 5 : register_histogram!(
47 : "safekeeper_write_wal_seconds",
48 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
50 : )
51 5 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 5 : });
53 5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 5 : register_histogram!(
55 : "safekeeper_flush_wal_seconds",
56 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
57 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
58 : )
59 5 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 5 : });
61 : /* BEGIN_HADRON */
62 : // Counter of all ProposerAcceptorMessage requests received
63 17 : pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
64 17 : register_int_counter_vec!(
65 : "safekeeper_proposer_acceptor_messages_total",
66 : "Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
67 17 : &["outcome"]
68 : )
69 17 : .expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
70 17 : });
71 0 : pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
72 0 : register_int_counter!(
73 : "safekeeper_wal_disk_io_errors",
74 : "Number of disk I/O errors when creating and flushing WALs and control files"
75 : )
76 0 : .expect("Failed to register safekeeper_wal_disk_io_errors counter")
77 0 : });
78 0 : pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
79 0 : register_int_counter!(
80 : "safekeeper_wal_storage_limit_errors",
81 : concat!(
82 : "Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
83 : "An increase in this metric indicates issues backing up or removing WALs."
84 : )
85 : )
86 0 : .expect("Failed to register safekeeper_wal_storage_limit_errors counter")
87 0 : });
88 0 : pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
89 0 : register_int_counter!(
90 : "safekeeper_recovery_pull_timeline_errors",
91 : concat!(
92 : "Number of errors due to pull_timeline errors during SK lost disk recovery.",
93 : "An increase in this metric indicates pull timelines runs into error."
94 : )
95 : )
96 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_errors counter")
97 0 : });
98 0 : pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy<IntCounter> = Lazy::new(|| {
99 0 : register_int_counter!(
100 : "safekeeper_recovery_pull_timeline_oks",
101 : concat!(
102 : "Number of successful pull_timeline during SK lost disk recovery.",
103 : "An increase in this metric indicates pull timelines is successful."
104 : )
105 : )
106 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_oks counter")
107 0 : });
108 0 : pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
109 0 : register_histogram!(
110 : "safekeeper_recovery_pull_timelines_seconds",
111 : "Seconds to pull timelines",
112 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
113 : )
114 0 : .expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram")
115 0 : });
116 0 : pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
117 0 : register_histogram_vec!(
118 : "safekeeper_recovery_pull_timeline_seconds",
119 : "Seconds to pull timeline",
120 0 : &["tenant_id", "timeline_id"],
121 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
122 : )
123 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec")
124 0 : });
125 : /* END_HADRON */
126 7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
127 7 : register_histogram!(
128 : "safekeeper_persist_control_file_seconds",
129 : "Seconds to persist and sync control file",
130 7 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
131 : )
132 7 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
133 7 : });
134 5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
135 5 : register_histogram_vec!(
136 : "safekeeper_wal_storage_operation_seconds",
137 : "Seconds spent on WAL storage operations",
138 5 : &["operation"],
139 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
140 : )
141 5 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
142 5 : });
143 13 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
144 13 : register_histogram_vec!(
145 : "safekeeper_misc_operation_seconds",
146 : "Seconds spent on miscellaneous operations",
147 13 : &["operation"],
148 13 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
149 : )
150 13 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
151 13 : });
152 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
153 0 : register_int_counter_vec!(
154 : "safekeeper_pg_io_bytes_total",
155 : "Bytes read from or written to any PostgreSQL connection",
156 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
157 : )
158 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
159 0 : });
160 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
161 0 : register_int_counter!(
162 : "safekeeper_broker_pushed_updates_total",
163 : "Number of timeline updates pushed to the broker"
164 : )
165 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
166 0 : });
167 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
168 0 : register_int_counter_vec!(
169 : "safekeeper_broker_pulled_updates_total",
170 : "Number of timeline updates pulled and processed from the broker",
171 0 : &["result"]
172 : )
173 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
174 0 : });
175 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
176 0 : register_int_counter_pair_vec!(
177 : "safekeeper_pg_queries_received_total",
178 : "Number of queries received through pg protocol",
179 : "safekeeper_pg_queries_finished_total",
180 : "Number of queries finished through pg protocol",
181 0 : &["query"]
182 : )
183 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
184 0 : });
185 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
186 0 : register_int_counter!(
187 : "safekeeper_removed_wal_segments_total",
188 : "Number of WAL segments removed from the disk"
189 : )
190 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
191 0 : });
192 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
193 0 : register_int_counter!(
194 : "safekeeper_backed_up_segments_total",
195 : "Number of WAL segments backed up to the S3"
196 : )
197 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
198 0 : });
199 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
200 0 : register_int_counter!(
201 : "safekeeper_backup_errors_total",
202 : "Number of errors during backup"
203 : )
204 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
205 0 : });
206 : /* BEGIN_HADRON */
207 0 : pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
208 0 : register_int_counter!(
209 : "safekeeper_backup_reelect_leader_total",
210 : "Number of times the backup leader was reelected"
211 : )
212 0 : .expect("Failed to register safekeeper_backup_reelect_leader_total counter")
213 0 : });
214 : /* END_HADRON */
215 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
216 0 : register_histogram!(
217 : "safekeeper_broker_push_update_seconds",
218 : "Seconds to push all timeline updates to the broker",
219 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
220 : )
221 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
222 0 : });
223 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
224 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
225 : ];
226 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
227 0 : register_histogram!(
228 : "safekeeper_broker_iteration_timelines",
229 : "Count of timelines pushed to the broker in a single iteration",
230 0 : TIMELINES_COUNT_BUCKETS.to_vec()
231 : )
232 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
233 0 : });
234 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
235 0 : register_int_counter!(
236 : "safekeeper_received_ps_feedbacks_total",
237 : "Number of pageserver feedbacks received"
238 : )
239 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
240 0 : });
241 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
242 0 : register_int_counter_vec!(
243 : "safekeeper_partial_backup_uploads_total",
244 : "Number of partial backup uploads to the S3",
245 0 : &["result"]
246 : )
247 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
248 0 : });
249 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
250 0 : register_int_counter!(
251 : "safekeeper_partial_backup_uploaded_bytes_total",
252 : "Number of bytes uploaded to the S3 during partial backup"
253 : )
254 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
255 0 : });
256 5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
257 5 : register_int_counter!(
258 : "safekeeper_manager_iterations_total",
259 : "Number of iterations of the timeline manager task"
260 : )
261 5 : .expect("Failed to register safekeeper_manager_iterations_total counter")
262 5 : });
263 5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
264 5 : register_int_counter!(
265 : "safekeeper_manager_active_changes_total",
266 : "Number of timeline active status changes in the timeline manager task"
267 : )
268 5 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
269 5 : });
270 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
271 0 : register_int_counter_pair!(
272 : "safekeeper_wal_backup_tasks_started_total",
273 : "Number of active WAL backup tasks",
274 : "safekeeper_wal_backup_tasks_finished_total",
275 : "Number of finished WAL backup tasks",
276 : )
277 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
278 0 : });
279 5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
280 5 : register_int_gauge!(
281 : "safekeeper_wal_receivers",
282 : "Number of currently connected WAL receivers (i.e. connected computes)"
283 : )
284 5 : .expect("Failed to register safekeeper_wal_receivers")
285 5 : });
286 4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
287 4 : register_int_gauge_vec!(
288 : "safekeeper_wal_readers",
289 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
290 4 : &["kind", "target"]
291 : )
292 4 : .expect("Failed to register safekeeper_wal_receivers")
293 4 : });
294 5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
295 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
296 : // full queues respectively.
297 5 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
298 5 : buckets.insert(0, 0.0);
299 5 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
300 5 : assert!(buckets.len() <= 12, "too many histogram buckets");
301 :
302 5 : register_histogram!(
303 : "safekeeper_wal_receiver_queue_depth",
304 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
305 5 : buckets
306 : )
307 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
308 5 : });
309 5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
310 5 : register_int_gauge!(
311 : "safekeeper_wal_receiver_queue_depth_total",
312 : "Total number of queued messages across all WAL receivers",
313 : )
314 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
315 5 : });
316 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
317 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
318 5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
319 5 : register_int_gauge!(
320 : "safekeeper_wal_receiver_queue_size_total",
321 : "Total memory byte size of queued messages across all WAL receivers",
322 : )
323 5 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
324 5 : });
325 :
326 : // Metrics collected on operations on the storage repository.
327 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
328 : #[strum(serialize_all = "kebab_case")]
329 : pub(crate) enum EvictionEvent {
330 : Evict,
331 : Restore,
332 : }
333 :
334 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
335 0 : register_int_counter_vec!(
336 : "safekeeper_eviction_events_started_total",
337 : "Number of eviction state changes, incremented when they start",
338 0 : &["kind"]
339 : )
340 0 : .expect("Failed to register metric")
341 0 : });
342 :
343 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
344 0 : register_int_counter_vec!(
345 : "safekeeper_eviction_events_completed_total",
346 : "Number of eviction state changes, incremented when they complete",
347 0 : &["kind"]
348 : )
349 0 : .expect("Failed to register metric")
350 0 : });
351 :
352 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
353 0 : register_int_gauge!(
354 : "safekeeper_evicted_timelines",
355 : "Number of currently evicted timelines"
356 : )
357 0 : .expect("Failed to register metric")
358 0 : });
359 :
360 : pub const LABEL_UNKNOWN: &str = "unknown";
361 :
362 : /// Labels for traffic metrics.
363 : #[derive(Clone)]
364 : struct ConnectionLabels {
365 : /// Availability zone of the connection origin.
366 : client_az: String,
367 : /// Availability zone of the current safekeeper.
368 : sk_az: String,
369 : /// Client application name.
370 : app_name: String,
371 : }
372 :
373 : impl ConnectionLabels {
374 0 : fn new() -> Self {
375 0 : Self {
376 0 : client_az: LABEL_UNKNOWN.to_string(),
377 0 : sk_az: LABEL_UNKNOWN.to_string(),
378 0 : app_name: LABEL_UNKNOWN.to_string(),
379 0 : }
380 0 : }
381 :
382 0 : fn build_metrics(
383 0 : &self,
384 0 : ) -> (
385 0 : GenericCounter<metrics::core::AtomicU64>,
386 0 : GenericCounter<metrics::core::AtomicU64>,
387 0 : ) {
388 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
389 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
390 0 : (client_az, sk_az) => {
391 0 : if client_az == sk_az {
392 0 : "true"
393 : } else {
394 0 : "false"
395 : }
396 : }
397 : };
398 :
399 0 : let read = PG_IO_BYTES.with_label_values(&[
400 0 : &self.client_az,
401 0 : &self.sk_az,
402 0 : &self.app_name,
403 0 : "read",
404 0 : same_az,
405 0 : ]);
406 0 : let write = PG_IO_BYTES.with_label_values(&[
407 0 : &self.client_az,
408 0 : &self.sk_az,
409 0 : &self.app_name,
410 0 : "write",
411 0 : same_az,
412 0 : ]);
413 0 : (read, write)
414 0 : }
415 : }
416 :
417 : struct TrafficMetricsState {
418 : /// Labels for traffic metrics.
419 : labels: ConnectionLabels,
420 : /// Total bytes read from this connection.
421 : read: GenericCounter<metrics::core::AtomicU64>,
422 : /// Total bytes written to this connection.
423 : write: GenericCounter<metrics::core::AtomicU64>,
424 : }
425 :
426 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
427 : #[derive(Clone)]
428 : pub struct TrafficMetrics {
429 : state: Arc<RwLock<TrafficMetricsState>>,
430 : }
431 :
432 : impl Default for TrafficMetrics {
433 0 : fn default() -> Self {
434 0 : Self::new()
435 0 : }
436 : }
437 :
438 : impl TrafficMetrics {
439 0 : pub fn new() -> Self {
440 0 : let labels = ConnectionLabels::new();
441 0 : let (read, write) = labels.build_metrics();
442 0 : let state = TrafficMetricsState {
443 0 : labels,
444 0 : read,
445 0 : write,
446 0 : };
447 0 : Self {
448 0 : state: Arc::new(RwLock::new(state)),
449 0 : }
450 0 : }
451 :
452 0 : pub fn set_client_az(&self, value: &str) {
453 0 : let mut state = self.state.write().unwrap();
454 0 : state.labels.client_az = value.to_string();
455 0 : (state.read, state.write) = state.labels.build_metrics();
456 0 : }
457 :
458 0 : pub fn set_sk_az(&self, value: &str) {
459 0 : let mut state = self.state.write().unwrap();
460 0 : state.labels.sk_az = value.to_string();
461 0 : (state.read, state.write) = state.labels.build_metrics();
462 0 : }
463 :
464 0 : pub fn set_app_name(&self, value: &str) {
465 0 : let mut state = self.state.write().unwrap();
466 0 : state.labels.app_name = value.to_string();
467 0 : (state.read, state.write) = state.labels.build_metrics();
468 0 : }
469 :
470 0 : pub fn observe_read(&self, cnt: usize) {
471 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
472 0 : }
473 :
474 0 : pub fn observe_write(&self, cnt: usize) {
475 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
476 0 : }
477 : }
478 :
479 : /// Metrics for WalStorage in a single timeline.
480 : #[derive(Clone, Default)]
481 : pub struct WalStorageMetrics {
482 : /// How much bytes were written in total.
483 : write_wal_bytes: u64,
484 : /// How much time spent writing WAL to disk, waiting for write(2).
485 : write_wal_seconds: f64,
486 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
487 : flush_wal_seconds: f64,
488 : }
489 :
490 : impl WalStorageMetrics {
491 620 : pub fn observe_write_bytes(&mut self, bytes: usize) {
492 620 : self.write_wal_bytes += bytes as u64;
493 620 : WRITE_WAL_BYTES.observe(bytes as f64);
494 620 : }
495 :
496 620 : pub fn observe_write_seconds(&mut self, seconds: f64) {
497 620 : self.write_wal_seconds += seconds;
498 620 : WRITE_WAL_SECONDS.observe(seconds);
499 620 : }
500 :
501 625 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
502 625 : self.flush_wal_seconds += seconds;
503 625 : FLUSH_WAL_SECONDS.observe(seconds);
504 625 : }
505 : }
506 :
507 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
508 1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
509 1245 : closure: impl Future<Output = Result<(), E>>,
510 1245 : ) -> Result<f64> {
511 1245 : let start = std::time::Instant::now();
512 1245 : closure.await.map_err(|e| e.into())?;
513 1245 : Ok(start.elapsed().as_secs_f64())
514 1245 : }
515 :
516 : /// Metrics for a single timeline.
517 : #[derive(Clone)]
518 : pub struct FullTimelineInfo {
519 : pub ttid: TenantTimelineId,
520 : pub ps_feedback_count: u64,
521 : pub ps_corruption_detected: bool,
522 : pub last_ps_feedback: PageserverFeedback,
523 : pub wal_backup_active: bool,
524 : pub timeline_is_active: bool,
525 : pub num_computes: u32,
526 : pub last_removed_segno: XLogSegNo,
527 : pub interpreted_wal_reader_tasks: usize,
528 :
529 : pub epoch_start_lsn: Lsn,
530 : pub mem_state: TimelineMemState,
531 : pub persisted_state: TimelinePersistentState,
532 :
533 : pub flush_lsn: Lsn,
534 :
535 : pub wal_storage: WalStorageMetrics,
536 : }
537 :
538 : /// Collects metrics for all active timelines.
539 : pub struct TimelineCollector {
540 : global_timelines: Arc<GlobalTimelines>,
541 : descs: Vec<Desc>,
542 : commit_lsn: GenericGaugeVec<AtomicU64>,
543 : backup_lsn: GenericGaugeVec<AtomicU64>,
544 : flush_lsn: GenericGaugeVec<AtomicU64>,
545 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
546 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
547 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
548 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
549 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
550 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
551 : ps_corruption_detected: IntGaugeVec,
552 : timeline_active: GenericGaugeVec<AtomicU64>,
553 : wal_backup_active: GenericGaugeVec<AtomicU64>,
554 : connected_computes: IntGaugeVec,
555 : disk_usage: GenericGaugeVec<AtomicU64>,
556 : acceptor_term: GenericGaugeVec<AtomicU64>,
557 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
558 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
559 : written_wal_seconds: GaugeVec,
560 : flushed_wal_seconds: GaugeVec,
561 : collect_timeline_metrics: Gauge,
562 : timelines_count: IntGauge,
563 : active_timelines_count: IntGauge,
564 : }
565 :
566 : impl TimelineCollector {
567 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
568 0 : let mut descs = Vec::new();
569 :
570 0 : let commit_lsn = GenericGaugeVec::new(
571 0 : Opts::new(
572 : "safekeeper_commit_lsn",
573 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
574 : ),
575 0 : &["tenant_id", "timeline_id"],
576 : )
577 0 : .unwrap();
578 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
579 :
580 0 : let backup_lsn = GenericGaugeVec::new(
581 0 : Opts::new(
582 : "safekeeper_backup_lsn",
583 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
584 : ),
585 0 : &["tenant_id", "timeline_id"],
586 : )
587 0 : .unwrap();
588 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
589 :
590 0 : let flush_lsn = GenericGaugeVec::new(
591 0 : Opts::new(
592 : "safekeeper_flush_lsn",
593 : "Current flush_lsn, grouped by timeline",
594 : ),
595 0 : &["tenant_id", "timeline_id"],
596 : )
597 0 : .unwrap();
598 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
599 :
600 0 : let epoch_start_lsn = GenericGaugeVec::new(
601 0 : Opts::new(
602 : "safekeeper_epoch_start_lsn",
603 : "Point since which compute generates new WAL in the current consensus term",
604 : ),
605 0 : &["tenant_id", "timeline_id"],
606 : )
607 0 : .unwrap();
608 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
609 :
610 0 : let peer_horizon_lsn = GenericGaugeVec::new(
611 0 : Opts::new(
612 : "safekeeper_peer_horizon_lsn",
613 : "LSN of the most lagging safekeeper",
614 : ),
615 0 : &["tenant_id", "timeline_id"],
616 : )
617 0 : .unwrap();
618 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
619 :
620 0 : let remote_consistent_lsn = GenericGaugeVec::new(
621 0 : Opts::new(
622 : "safekeeper_remote_consistent_lsn",
623 : "LSN which is persisted to the remote storage in pageserver",
624 : ),
625 0 : &["tenant_id", "timeline_id"],
626 : )
627 0 : .unwrap();
628 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
629 :
630 0 : let ps_last_received_lsn = GenericGaugeVec::new(
631 0 : Opts::new(
632 : "safekeeper_ps_last_received_lsn",
633 : "Last LSN received by the pageserver, acknowledged in the feedback",
634 : ),
635 0 : &["tenant_id", "timeline_id"],
636 : )
637 0 : .unwrap();
638 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
639 :
640 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
641 0 : Opts::new(
642 : "safekeeper_feedback_last_time_seconds",
643 : "Timestamp of the last feedback from the pageserver",
644 : ),
645 0 : &["tenant_id", "timeline_id"],
646 : )
647 0 : .unwrap();
648 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
649 :
650 0 : let ps_feedback_count = GenericGaugeVec::new(
651 0 : Opts::new(
652 : "safekeeper_ps_feedback_count_total",
653 : "Number of feedbacks received from the pageserver",
654 : ),
655 0 : &["tenant_id", "timeline_id"],
656 : )
657 0 : .unwrap();
658 :
659 0 : let ps_corruption_detected = IntGaugeVec::new(
660 0 : Opts::new(
661 : "safekeeper_ps_corruption_detected",
662 : "1 if corruption was detected in the timeline according to feedback from the pageserver, 0 otherwise",
663 : ),
664 0 : &["tenant_id", "timeline_id"],
665 : )
666 0 : .unwrap();
667 :
668 0 : let timeline_active = GenericGaugeVec::new(
669 0 : Opts::new(
670 : "safekeeper_timeline_active",
671 : "Reports 1 for active timelines, 0 for inactive",
672 : ),
673 0 : &["tenant_id", "timeline_id"],
674 : )
675 0 : .unwrap();
676 0 : descs.extend(timeline_active.desc().into_iter().cloned());
677 :
678 0 : let wal_backup_active = GenericGaugeVec::new(
679 0 : Opts::new(
680 : "safekeeper_wal_backup_active",
681 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
682 : ),
683 0 : &["tenant_id", "timeline_id"],
684 : )
685 0 : .unwrap();
686 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
687 :
688 0 : let connected_computes = IntGaugeVec::new(
689 0 : Opts::new(
690 : "safekeeper_connected_computes",
691 : "Number of active compute connections",
692 : ),
693 0 : &["tenant_id", "timeline_id"],
694 : )
695 0 : .unwrap();
696 0 : descs.extend(connected_computes.desc().into_iter().cloned());
697 :
698 0 : let disk_usage = GenericGaugeVec::new(
699 0 : Opts::new(
700 : "safekeeper_disk_usage_bytes",
701 : "Estimated disk space used to store WAL segments",
702 : ),
703 0 : &["tenant_id", "timeline_id"],
704 : )
705 0 : .unwrap();
706 0 : descs.extend(disk_usage.desc().into_iter().cloned());
707 :
708 0 : let acceptor_term = GenericGaugeVec::new(
709 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
710 0 : &["tenant_id", "timeline_id"],
711 : )
712 0 : .unwrap();
713 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
714 :
715 0 : let written_wal_bytes = GenericGaugeVec::new(
716 0 : Opts::new(
717 : "safekeeper_written_wal_bytes_total",
718 : "Number of WAL bytes written to disk, grouped by timeline",
719 : ),
720 0 : &["tenant_id", "timeline_id"],
721 : )
722 0 : .unwrap();
723 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
724 :
725 0 : let written_wal_seconds = GaugeVec::new(
726 0 : Opts::new(
727 : "safekeeper_written_wal_seconds_total",
728 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
729 : ),
730 0 : &["tenant_id", "timeline_id"],
731 : )
732 0 : .unwrap();
733 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
734 :
735 0 : let flushed_wal_seconds = GaugeVec::new(
736 0 : Opts::new(
737 : "safekeeper_flushed_wal_seconds_total",
738 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
739 : ),
740 0 : &["tenant_id", "timeline_id"],
741 : )
742 0 : .unwrap();
743 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
744 :
745 0 : let collect_timeline_metrics = Gauge::new(
746 : "safekeeper_collect_timeline_metrics_seconds",
747 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
748 : )
749 0 : .unwrap();
750 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
751 :
752 0 : let timelines_count = IntGauge::new(
753 : "safekeeper_timelines",
754 : "Total number of timelines loaded in-memory",
755 : )
756 0 : .unwrap();
757 0 : descs.extend(timelines_count.desc().into_iter().cloned());
758 :
759 0 : let active_timelines_count = IntGauge::new(
760 : "safekeeper_active_timelines",
761 : "Total number of active timelines",
762 : )
763 0 : .unwrap();
764 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
765 :
766 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
767 0 : Opts::new(
768 : "safekeeper_interpreted_wal_reader_tasks",
769 : "Number of active interpreted wal reader tasks, grouped by timeline",
770 : ),
771 0 : &["tenant_id", "timeline_id"],
772 : )
773 0 : .unwrap();
774 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
775 :
776 0 : TimelineCollector {
777 0 : global_timelines,
778 0 : descs,
779 0 : commit_lsn,
780 0 : backup_lsn,
781 0 : flush_lsn,
782 0 : epoch_start_lsn,
783 0 : peer_horizon_lsn,
784 0 : remote_consistent_lsn,
785 0 : ps_last_received_lsn,
786 0 : feedback_last_time_seconds,
787 0 : ps_feedback_count,
788 0 : ps_corruption_detected,
789 0 : timeline_active,
790 0 : wal_backup_active,
791 0 : connected_computes,
792 0 : disk_usage,
793 0 : acceptor_term,
794 0 : written_wal_bytes,
795 0 : written_wal_seconds,
796 0 : flushed_wal_seconds,
797 0 : collect_timeline_metrics,
798 0 : timelines_count,
799 0 : active_timelines_count,
800 0 : interpreted_wal_reader_tasks,
801 0 : }
802 0 : }
803 : }
804 :
805 : impl Collector for TimelineCollector {
806 0 : fn desc(&self) -> Vec<&Desc> {
807 0 : self.descs.iter().collect()
808 0 : }
809 :
810 0 : fn collect(&self) -> Vec<MetricFamily> {
811 0 : let start_collecting = Instant::now();
812 :
813 : // reset all metrics to clean up inactive timelines
814 0 : self.commit_lsn.reset();
815 0 : self.backup_lsn.reset();
816 0 : self.flush_lsn.reset();
817 0 : self.epoch_start_lsn.reset();
818 0 : self.peer_horizon_lsn.reset();
819 0 : self.remote_consistent_lsn.reset();
820 0 : self.ps_last_received_lsn.reset();
821 0 : self.feedback_last_time_seconds.reset();
822 0 : self.ps_feedback_count.reset();
823 0 : self.timeline_active.reset();
824 0 : self.wal_backup_active.reset();
825 0 : self.connected_computes.reset();
826 0 : self.disk_usage.reset();
827 0 : self.acceptor_term.reset();
828 0 : self.written_wal_bytes.reset();
829 0 : self.interpreted_wal_reader_tasks.reset();
830 0 : self.written_wal_seconds.reset();
831 0 : self.flushed_wal_seconds.reset();
832 :
833 0 : let timelines_count = self.global_timelines.get_all().len();
834 0 : let mut active_timelines_count = 0;
835 :
836 : // Prometheus Collector is sync, and data is stored under async lock. To
837 : // bridge the gap with a crutch, collect data in spawned thread with
838 : // local tokio runtime.
839 0 : let global_timelines = self.global_timelines.clone();
840 0 : let infos = std::thread::spawn(|| {
841 0 : let rt = tokio::runtime::Builder::new_current_thread()
842 0 : .build()
843 0 : .expect("failed to create rt");
844 0 : rt.block_on(collect_timeline_metrics(global_timelines))
845 0 : })
846 0 : .join()
847 0 : .expect("collect_timeline_metrics thread panicked");
848 :
849 0 : for tli in &infos {
850 0 : let tenant_id = tli.ttid.tenant_id.to_string();
851 0 : let timeline_id = tli.ttid.timeline_id.to_string();
852 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
853 :
854 0 : if tli.timeline_is_active {
855 0 : active_timelines_count += 1;
856 0 : }
857 :
858 0 : self.commit_lsn
859 0 : .with_label_values(labels)
860 0 : .set(tli.mem_state.commit_lsn.into());
861 0 : self.backup_lsn
862 0 : .with_label_values(labels)
863 0 : .set(tli.mem_state.backup_lsn.into());
864 0 : self.flush_lsn
865 0 : .with_label_values(labels)
866 0 : .set(tli.flush_lsn.into());
867 0 : self.epoch_start_lsn
868 0 : .with_label_values(labels)
869 0 : .set(tli.epoch_start_lsn.into());
870 0 : self.peer_horizon_lsn
871 0 : .with_label_values(labels)
872 0 : .set(tli.mem_state.peer_horizon_lsn.into());
873 0 : self.remote_consistent_lsn
874 0 : .with_label_values(labels)
875 0 : .set(tli.mem_state.remote_consistent_lsn.into());
876 0 : self.timeline_active
877 0 : .with_label_values(labels)
878 0 : .set(tli.timeline_is_active as u64);
879 0 : self.wal_backup_active
880 0 : .with_label_values(labels)
881 0 : .set(tli.wal_backup_active as u64);
882 0 : self.connected_computes
883 0 : .with_label_values(labels)
884 0 : .set(tli.num_computes as i64);
885 0 : self.acceptor_term
886 0 : .with_label_values(labels)
887 0 : .set(tli.persisted_state.acceptor_state.term);
888 0 : self.written_wal_bytes
889 0 : .with_label_values(labels)
890 0 : .set(tli.wal_storage.write_wal_bytes);
891 0 : self.interpreted_wal_reader_tasks
892 0 : .with_label_values(labels)
893 0 : .set(tli.interpreted_wal_reader_tasks as u64);
894 0 : self.written_wal_seconds
895 0 : .with_label_values(labels)
896 0 : .set(tli.wal_storage.write_wal_seconds);
897 0 : self.flushed_wal_seconds
898 0 : .with_label_values(labels)
899 0 : .set(tli.wal_storage.flush_wal_seconds);
900 :
901 0 : self.ps_last_received_lsn
902 0 : .with_label_values(labels)
903 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
904 0 : self.ps_feedback_count
905 0 : .with_label_values(labels)
906 0 : .set(tli.ps_feedback_count);
907 0 : self.ps_corruption_detected
908 0 : .with_label_values(labels)
909 0 : .set(tli.ps_corruption_detected as i64);
910 0 : if let Ok(unix_time) = tli
911 0 : .last_ps_feedback
912 0 : .replytime
913 0 : .duration_since(SystemTime::UNIX_EPOCH)
914 0 : {
915 0 : self.feedback_last_time_seconds
916 0 : .with_label_values(labels)
917 0 : .set(unix_time.as_secs());
918 0 : }
919 :
920 0 : if tli.last_removed_segno != 0 {
921 0 : let segno_count = tli
922 0 : .flush_lsn
923 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
924 0 : - tli.last_removed_segno;
925 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
926 0 : self.disk_usage
927 0 : .with_label_values(labels)
928 0 : .set(disk_usage_bytes);
929 0 : }
930 : }
931 :
932 : // collect MetricFamilys.
933 0 : let mut mfs = Vec::new();
934 0 : mfs.extend(self.commit_lsn.collect());
935 0 : mfs.extend(self.backup_lsn.collect());
936 0 : mfs.extend(self.flush_lsn.collect());
937 0 : mfs.extend(self.epoch_start_lsn.collect());
938 0 : mfs.extend(self.peer_horizon_lsn.collect());
939 0 : mfs.extend(self.remote_consistent_lsn.collect());
940 0 : mfs.extend(self.ps_last_received_lsn.collect());
941 0 : mfs.extend(self.feedback_last_time_seconds.collect());
942 0 : mfs.extend(self.ps_feedback_count.collect());
943 0 : mfs.extend(self.ps_corruption_detected.collect());
944 0 : mfs.extend(self.timeline_active.collect());
945 0 : mfs.extend(self.wal_backup_active.collect());
946 0 : mfs.extend(self.connected_computes.collect());
947 0 : mfs.extend(self.disk_usage.collect());
948 0 : mfs.extend(self.acceptor_term.collect());
949 0 : mfs.extend(self.written_wal_bytes.collect());
950 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
951 0 : mfs.extend(self.written_wal_seconds.collect());
952 0 : mfs.extend(self.flushed_wal_seconds.collect());
953 :
954 : // report time it took to collect all info
955 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
956 0 : self.collect_timeline_metrics.set(elapsed);
957 0 : mfs.extend(self.collect_timeline_metrics.collect());
958 :
959 : // report total number of timelines
960 0 : self.timelines_count.set(timelines_count as i64);
961 0 : mfs.extend(self.timelines_count.collect());
962 :
963 0 : self.active_timelines_count
964 0 : .set(active_timelines_count as i64);
965 0 : mfs.extend(self.active_timelines_count.collect());
966 :
967 0 : mfs
968 0 : }
969 : }
970 :
971 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
972 0 : let mut res = vec![];
973 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
974 :
975 0 : for tli in active_timelines {
976 0 : if let Some(info) = tli.info_for_metrics().await {
977 0 : res.push(info);
978 0 : }
979 : }
980 0 : res
981 0 : }
982 :
983 : /* BEGIN_HADRON */
984 : // Metrics reporting the time spent to perform each safekeeper filesystem utilization check.
985 0 : pub static GLOBAL_DISK_UTIL_CHECK_SECONDS: Lazy<Histogram> = Lazy::new(|| {
986 : // Buckets from 1ms up to 10s
987 0 : let buckets = vec![0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0];
988 0 : register_histogram!(
989 : "safekeeper_global_disk_utilization_check_seconds",
990 : "Seconds spent to perform each safekeeper filesystem utilization check",
991 0 : buckets
992 : )
993 0 : .expect("Failed to register safekeeper_global_disk_utilization_check_seconds histogram")
994 0 : });
995 : /* END_HADRON */
|