Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::sync::{Arc, RwLock};
4 : use std::time::{Instant, SystemTime};
5 :
6 : use anyhow::Result;
7 : use futures::Future;
8 : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
9 : use metrics::proto::MetricFamily;
10 : use metrics::{
11 : DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
12 : IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
13 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
14 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
15 : register_int_gauge_vec,
16 : };
17 : use once_cell::sync::Lazy;
18 : use postgres_ffi::XLogSegNo;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 :
23 : use crate::GlobalTimelines;
24 : use crate::receive_wal::MSG_QUEUE_SIZE;
25 : use crate::state::{TimelineMemState, TimelinePersistentState};
26 :
27 : // Global metrics across all timelines.
28 5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 5 : register_histogram!(
30 : "safekeeper_write_wal_bytes",
31 : "Bytes written to WAL in a single request",
32 5 : vec![
33 : 1.0,
34 : 10.0,
35 : 100.0,
36 : 1024.0,
37 : 8192.0,
38 5 : 128.0 * 1024.0,
39 5 : 1024.0 * 1024.0,
40 5 : 10.0 * 1024.0 * 1024.0
41 : ]
42 : )
43 5 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 5 : });
45 5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 5 : register_histogram!(
47 : "safekeeper_write_wal_seconds",
48 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
50 : )
51 5 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 5 : });
53 5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 5 : register_histogram!(
55 : "safekeeper_flush_wal_seconds",
56 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
57 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
58 : )
59 5 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 5 : });
61 : /* BEGIN_HADRON */
62 : // Counter of all ProposerAcceptorMessage requests received
63 17 : pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
64 17 : register_int_counter_vec!(
65 : "safekeeper_proposer_acceptor_messages_total",
66 : "Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
67 17 : &["outcome"]
68 : )
69 17 : .expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
70 17 : });
71 0 : pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
72 0 : register_int_counter!(
73 : "safekeeper_wal_disk_io_errors",
74 : "Number of disk I/O errors when creating and flushing WALs and control files"
75 : )
76 0 : .expect("Failed to register safekeeper_wal_disk_io_errors counter")
77 0 : });
78 0 : pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
79 0 : register_int_counter!(
80 : "safekeeper_wal_storage_limit_errors",
81 : concat!(
82 : "Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
83 : "An increase in this metric indicates issues backing up or removing WALs."
84 : )
85 : )
86 0 : .expect("Failed to register safekeeper_wal_storage_limit_errors counter")
87 0 : });
88 0 : pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
89 0 : register_int_counter!(
90 : "safekeeper_recovery_pull_timeline_errors",
91 : concat!(
92 : "Number of errors due to pull_timeline errors during SK lost disk recovery.",
93 : "An increase in this metric indicates pull timelines runs into error."
94 : )
95 : )
96 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_errors counter")
97 0 : });
98 0 : pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy<IntCounter> = Lazy::new(|| {
99 0 : register_int_counter!(
100 : "safekeeper_recovery_pull_timeline_oks",
101 : concat!(
102 : "Number of successful pull_timeline during SK lost disk recovery.",
103 : "An increase in this metric indicates pull timelines is successful."
104 : )
105 : )
106 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_oks counter")
107 0 : });
108 0 : pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
109 0 : register_histogram!(
110 : "safekeeper_recovery_pull_timelines_seconds",
111 : "Seconds to pull timelines",
112 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
113 : )
114 0 : .expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram")
115 0 : });
116 0 : pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
117 0 : register_histogram_vec!(
118 : "safekeeper_recovery_pull_timeline_seconds",
119 : "Seconds to pull timeline",
120 0 : &["tenant_id", "timeline_id"],
121 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
122 : )
123 0 : .expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec")
124 0 : });
125 : /* END_HADRON */
126 7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
127 7 : register_histogram!(
128 : "safekeeper_persist_control_file_seconds",
129 : "Seconds to persist and sync control file",
130 7 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
131 : )
132 7 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
133 7 : });
134 5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
135 5 : register_histogram_vec!(
136 : "safekeeper_wal_storage_operation_seconds",
137 : "Seconds spent on WAL storage operations",
138 5 : &["operation"],
139 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
140 : )
141 5 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
142 5 : });
143 15 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
144 15 : register_histogram_vec!(
145 : "safekeeper_misc_operation_seconds",
146 : "Seconds spent on miscellaneous operations",
147 15 : &["operation"],
148 15 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
149 : )
150 15 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
151 15 : });
152 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
153 0 : register_int_counter_vec!(
154 : "safekeeper_pg_io_bytes_total",
155 : "Bytes read from or written to any PostgreSQL connection",
156 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
157 : )
158 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
159 0 : });
160 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
161 0 : register_int_counter!(
162 : "safekeeper_broker_pushed_updates_total",
163 : "Number of timeline updates pushed to the broker"
164 : )
165 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
166 0 : });
167 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
168 0 : register_int_counter_vec!(
169 : "safekeeper_broker_pulled_updates_total",
170 : "Number of timeline updates pulled and processed from the broker",
171 0 : &["result"]
172 : )
173 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
174 0 : });
175 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
176 0 : register_int_counter_pair_vec!(
177 : "safekeeper_pg_queries_received_total",
178 : "Number of queries received through pg protocol",
179 : "safekeeper_pg_queries_finished_total",
180 : "Number of queries finished through pg protocol",
181 0 : &["query"]
182 : )
183 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
184 0 : });
185 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
186 0 : register_int_counter!(
187 : "safekeeper_removed_wal_segments_total",
188 : "Number of WAL segments removed from the disk"
189 : )
190 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
191 0 : });
192 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
193 0 : register_int_counter!(
194 : "safekeeper_backed_up_segments_total",
195 : "Number of WAL segments backed up to the S3"
196 : )
197 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
198 0 : });
199 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
200 0 : register_int_counter!(
201 : "safekeeper_backup_errors_total",
202 : "Number of errors during backup"
203 : )
204 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
205 0 : });
206 : /* BEGIN_HADRON */
207 0 : pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
208 0 : register_int_counter!(
209 : "safekeeper_backup_reelect_leader_total",
210 : "Number of times the backup leader was reelected"
211 : )
212 0 : .expect("Failed to register safekeeper_backup_reelect_leader_total counter")
213 0 : });
214 : /* END_HADRON */
215 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
216 0 : register_histogram!(
217 : "safekeeper_broker_push_update_seconds",
218 : "Seconds to push all timeline updates to the broker",
219 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
220 : )
221 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
222 0 : });
223 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
224 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
225 : ];
226 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
227 0 : register_histogram!(
228 : "safekeeper_broker_iteration_timelines",
229 : "Count of timelines pushed to the broker in a single iteration",
230 0 : TIMELINES_COUNT_BUCKETS.to_vec()
231 : )
232 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
233 0 : });
234 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
235 0 : register_int_counter!(
236 : "safekeeper_received_ps_feedbacks_total",
237 : "Number of pageserver feedbacks received"
238 : )
239 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
240 0 : });
241 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
242 0 : register_int_counter_vec!(
243 : "safekeeper_partial_backup_uploads_total",
244 : "Number of partial backup uploads to the S3",
245 0 : &["result"]
246 : )
247 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
248 0 : });
249 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
250 0 : register_int_counter!(
251 : "safekeeper_partial_backup_uploaded_bytes_total",
252 : "Number of bytes uploaded to the S3 during partial backup"
253 : )
254 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
255 0 : });
256 5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
257 5 : register_int_counter!(
258 : "safekeeper_manager_iterations_total",
259 : "Number of iterations of the timeline manager task"
260 : )
261 5 : .expect("Failed to register safekeeper_manager_iterations_total counter")
262 5 : });
263 5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
264 5 : register_int_counter!(
265 : "safekeeper_manager_active_changes_total",
266 : "Number of timeline active status changes in the timeline manager task"
267 : )
268 5 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
269 5 : });
270 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
271 0 : register_int_counter_pair!(
272 : "safekeeper_wal_backup_tasks_started_total",
273 : "Number of active WAL backup tasks",
274 : "safekeeper_wal_backup_tasks_finished_total",
275 : "Number of finished WAL backup tasks",
276 : )
277 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
278 0 : });
279 5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
280 5 : register_int_gauge!(
281 : "safekeeper_wal_receivers",
282 : "Number of currently connected WAL receivers (i.e. connected computes)"
283 : )
284 5 : .expect("Failed to register safekeeper_wal_receivers")
285 5 : });
286 4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
287 4 : register_int_gauge_vec!(
288 : "safekeeper_wal_readers",
289 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
290 4 : &["kind", "target"]
291 : )
292 4 : .expect("Failed to register safekeeper_wal_receivers")
293 4 : });
294 5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
295 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
296 : // full queues respectively.
297 5 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
298 5 : buckets.insert(0, 0.0);
299 5 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
300 5 : assert!(buckets.len() <= 12, "too many histogram buckets");
301 :
302 5 : register_histogram!(
303 : "safekeeper_wal_receiver_queue_depth",
304 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
305 5 : buckets
306 : )
307 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
308 5 : });
309 5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
310 5 : register_int_gauge!(
311 : "safekeeper_wal_receiver_queue_depth_total",
312 : "Total number of queued messages across all WAL receivers",
313 : )
314 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
315 5 : });
316 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
317 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
318 5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
319 5 : register_int_gauge!(
320 : "safekeeper_wal_receiver_queue_size_total",
321 : "Total memory byte size of queued messages across all WAL receivers",
322 : )
323 5 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
324 5 : });
325 :
326 : // Metrics collected on operations on the storage repository.
327 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
328 : #[strum(serialize_all = "kebab_case")]
329 : pub(crate) enum EvictionEvent {
330 : Evict,
331 : Restore,
332 : }
333 :
334 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
335 0 : register_int_counter_vec!(
336 : "safekeeper_eviction_events_started_total",
337 : "Number of eviction state changes, incremented when they start",
338 0 : &["kind"]
339 : )
340 0 : .expect("Failed to register metric")
341 0 : });
342 :
343 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
344 0 : register_int_counter_vec!(
345 : "safekeeper_eviction_events_completed_total",
346 : "Number of eviction state changes, incremented when they complete",
347 0 : &["kind"]
348 : )
349 0 : .expect("Failed to register metric")
350 0 : });
351 :
352 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
353 0 : register_int_gauge!(
354 : "safekeeper_evicted_timelines",
355 : "Number of currently evicted timelines"
356 : )
357 0 : .expect("Failed to register metric")
358 0 : });
359 :
360 : pub const LABEL_UNKNOWN: &str = "unknown";
361 :
362 : /// Labels for traffic metrics.
363 : #[derive(Clone)]
364 : struct ConnectionLabels {
365 : /// Availability zone of the connection origin.
366 : client_az: String,
367 : /// Availability zone of the current safekeeper.
368 : sk_az: String,
369 : /// Client application name.
370 : app_name: String,
371 : }
372 :
373 : impl ConnectionLabels {
374 0 : fn new() -> Self {
375 0 : Self {
376 0 : client_az: LABEL_UNKNOWN.to_string(),
377 0 : sk_az: LABEL_UNKNOWN.to_string(),
378 0 : app_name: LABEL_UNKNOWN.to_string(),
379 0 : }
380 0 : }
381 :
382 0 : fn build_metrics(
383 0 : &self,
384 0 : ) -> (
385 0 : GenericCounter<metrics::core::AtomicU64>,
386 0 : GenericCounter<metrics::core::AtomicU64>,
387 0 : ) {
388 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
389 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
390 0 : (client_az, sk_az) => {
391 0 : if client_az == sk_az {
392 0 : "true"
393 : } else {
394 0 : "false"
395 : }
396 : }
397 : };
398 :
399 0 : let read = PG_IO_BYTES.with_label_values(&[
400 0 : &self.client_az,
401 0 : &self.sk_az,
402 0 : &self.app_name,
403 0 : "read",
404 0 : same_az,
405 0 : ]);
406 0 : let write = PG_IO_BYTES.with_label_values(&[
407 0 : &self.client_az,
408 0 : &self.sk_az,
409 0 : &self.app_name,
410 0 : "write",
411 0 : same_az,
412 0 : ]);
413 0 : (read, write)
414 0 : }
415 : }
416 :
417 : struct TrafficMetricsState {
418 : /// Labels for traffic metrics.
419 : labels: ConnectionLabels,
420 : /// Total bytes read from this connection.
421 : read: GenericCounter<metrics::core::AtomicU64>,
422 : /// Total bytes written to this connection.
423 : write: GenericCounter<metrics::core::AtomicU64>,
424 : }
425 :
426 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
427 : #[derive(Clone)]
428 : pub struct TrafficMetrics {
429 : state: Arc<RwLock<TrafficMetricsState>>,
430 : }
431 :
432 : impl Default for TrafficMetrics {
433 0 : fn default() -> Self {
434 0 : Self::new()
435 0 : }
436 : }
437 :
438 : impl TrafficMetrics {
439 0 : pub fn new() -> Self {
440 0 : let labels = ConnectionLabels::new();
441 0 : let (read, write) = labels.build_metrics();
442 0 : let state = TrafficMetricsState {
443 0 : labels,
444 0 : read,
445 0 : write,
446 0 : };
447 0 : Self {
448 0 : state: Arc::new(RwLock::new(state)),
449 0 : }
450 0 : }
451 :
452 0 : pub fn set_client_az(&self, value: &str) {
453 0 : let mut state = self.state.write().unwrap();
454 0 : state.labels.client_az = value.to_string();
455 0 : (state.read, state.write) = state.labels.build_metrics();
456 0 : }
457 :
458 0 : pub fn set_sk_az(&self, value: &str) {
459 0 : let mut state = self.state.write().unwrap();
460 0 : state.labels.sk_az = value.to_string();
461 0 : (state.read, state.write) = state.labels.build_metrics();
462 0 : }
463 :
464 0 : pub fn set_app_name(&self, value: &str) {
465 0 : let mut state = self.state.write().unwrap();
466 0 : state.labels.app_name = value.to_string();
467 0 : (state.read, state.write) = state.labels.build_metrics();
468 0 : }
469 :
470 0 : pub fn observe_read(&self, cnt: usize) {
471 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
472 0 : }
473 :
474 0 : pub fn observe_write(&self, cnt: usize) {
475 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
476 0 : }
477 : }
478 :
479 : /// Metrics for WalStorage in a single timeline.
480 : #[derive(Clone, Default)]
481 : pub struct WalStorageMetrics {
482 : /// How much bytes were written in total.
483 : write_wal_bytes: u64,
484 : /// How much time spent writing WAL to disk, waiting for write(2).
485 : write_wal_seconds: f64,
486 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
487 : flush_wal_seconds: f64,
488 : }
489 :
490 : impl WalStorageMetrics {
491 620 : pub fn observe_write_bytes(&mut self, bytes: usize) {
492 620 : self.write_wal_bytes += bytes as u64;
493 620 : WRITE_WAL_BYTES.observe(bytes as f64);
494 620 : }
495 :
496 620 : pub fn observe_write_seconds(&mut self, seconds: f64) {
497 620 : self.write_wal_seconds += seconds;
498 620 : WRITE_WAL_SECONDS.observe(seconds);
499 620 : }
500 :
501 625 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
502 625 : self.flush_wal_seconds += seconds;
503 625 : FLUSH_WAL_SECONDS.observe(seconds);
504 625 : }
505 : }
506 :
507 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
508 1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
509 1245 : closure: impl Future<Output = Result<(), E>>,
510 1245 : ) -> Result<f64> {
511 1245 : let start = std::time::Instant::now();
512 1245 : closure.await.map_err(|e| e.into())?;
513 1245 : Ok(start.elapsed().as_secs_f64())
514 1245 : }
515 :
516 : /// Metrics for a single timeline.
517 : #[derive(Clone)]
518 : pub struct FullTimelineInfo {
519 : pub ttid: TenantTimelineId,
520 : pub ps_feedback_count: u64,
521 : pub last_ps_feedback: PageserverFeedback,
522 : pub wal_backup_active: bool,
523 : pub timeline_is_active: bool,
524 : pub num_computes: u32,
525 : pub last_removed_segno: XLogSegNo,
526 : pub interpreted_wal_reader_tasks: usize,
527 :
528 : pub epoch_start_lsn: Lsn,
529 : pub mem_state: TimelineMemState,
530 : pub persisted_state: TimelinePersistentState,
531 :
532 : pub flush_lsn: Lsn,
533 :
534 : pub wal_storage: WalStorageMetrics,
535 : }
536 :
537 : /// Collects metrics for all active timelines.
538 : pub struct TimelineCollector {
539 : global_timelines: Arc<GlobalTimelines>,
540 : descs: Vec<Desc>,
541 : commit_lsn: GenericGaugeVec<AtomicU64>,
542 : backup_lsn: GenericGaugeVec<AtomicU64>,
543 : flush_lsn: GenericGaugeVec<AtomicU64>,
544 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
545 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
546 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
547 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
548 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
549 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
550 : timeline_active: GenericGaugeVec<AtomicU64>,
551 : wal_backup_active: GenericGaugeVec<AtomicU64>,
552 : connected_computes: IntGaugeVec,
553 : disk_usage: GenericGaugeVec<AtomicU64>,
554 : acceptor_term: GenericGaugeVec<AtomicU64>,
555 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
556 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
557 : written_wal_seconds: GaugeVec,
558 : flushed_wal_seconds: GaugeVec,
559 : collect_timeline_metrics: Gauge,
560 : timelines_count: IntGauge,
561 : active_timelines_count: IntGauge,
562 : }
563 :
564 : impl TimelineCollector {
565 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
566 0 : let mut descs = Vec::new();
567 :
568 0 : let commit_lsn = GenericGaugeVec::new(
569 0 : Opts::new(
570 : "safekeeper_commit_lsn",
571 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
572 : ),
573 0 : &["tenant_id", "timeline_id"],
574 : )
575 0 : .unwrap();
576 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
577 :
578 0 : let backup_lsn = GenericGaugeVec::new(
579 0 : Opts::new(
580 : "safekeeper_backup_lsn",
581 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
582 : ),
583 0 : &["tenant_id", "timeline_id"],
584 : )
585 0 : .unwrap();
586 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
587 :
588 0 : let flush_lsn = GenericGaugeVec::new(
589 0 : Opts::new(
590 : "safekeeper_flush_lsn",
591 : "Current flush_lsn, grouped by timeline",
592 : ),
593 0 : &["tenant_id", "timeline_id"],
594 : )
595 0 : .unwrap();
596 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
597 :
598 0 : let epoch_start_lsn = GenericGaugeVec::new(
599 0 : Opts::new(
600 : "safekeeper_epoch_start_lsn",
601 : "Point since which compute generates new WAL in the current consensus term",
602 : ),
603 0 : &["tenant_id", "timeline_id"],
604 : )
605 0 : .unwrap();
606 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
607 :
608 0 : let peer_horizon_lsn = GenericGaugeVec::new(
609 0 : Opts::new(
610 : "safekeeper_peer_horizon_lsn",
611 : "LSN of the most lagging safekeeper",
612 : ),
613 0 : &["tenant_id", "timeline_id"],
614 : )
615 0 : .unwrap();
616 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
617 :
618 0 : let remote_consistent_lsn = GenericGaugeVec::new(
619 0 : Opts::new(
620 : "safekeeper_remote_consistent_lsn",
621 : "LSN which is persisted to the remote storage in pageserver",
622 : ),
623 0 : &["tenant_id", "timeline_id"],
624 : )
625 0 : .unwrap();
626 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
627 :
628 0 : let ps_last_received_lsn = GenericGaugeVec::new(
629 0 : Opts::new(
630 : "safekeeper_ps_last_received_lsn",
631 : "Last LSN received by the pageserver, acknowledged in the feedback",
632 : ),
633 0 : &["tenant_id", "timeline_id"],
634 : )
635 0 : .unwrap();
636 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
637 :
638 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
639 0 : Opts::new(
640 : "safekeeper_feedback_last_time_seconds",
641 : "Timestamp of the last feedback from the pageserver",
642 : ),
643 0 : &["tenant_id", "timeline_id"],
644 : )
645 0 : .unwrap();
646 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
647 :
648 0 : let ps_feedback_count = GenericGaugeVec::new(
649 0 : Opts::new(
650 : "safekeeper_ps_feedback_count_total",
651 : "Number of feedbacks received from the pageserver",
652 : ),
653 0 : &["tenant_id", "timeline_id"],
654 : )
655 0 : .unwrap();
656 :
657 0 : let timeline_active = GenericGaugeVec::new(
658 0 : Opts::new(
659 : "safekeeper_timeline_active",
660 : "Reports 1 for active timelines, 0 for inactive",
661 : ),
662 0 : &["tenant_id", "timeline_id"],
663 : )
664 0 : .unwrap();
665 0 : descs.extend(timeline_active.desc().into_iter().cloned());
666 :
667 0 : let wal_backup_active = GenericGaugeVec::new(
668 0 : Opts::new(
669 : "safekeeper_wal_backup_active",
670 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
671 : ),
672 0 : &["tenant_id", "timeline_id"],
673 : )
674 0 : .unwrap();
675 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
676 :
677 0 : let connected_computes = IntGaugeVec::new(
678 0 : Opts::new(
679 : "safekeeper_connected_computes",
680 : "Number of active compute connections",
681 : ),
682 0 : &["tenant_id", "timeline_id"],
683 : )
684 0 : .unwrap();
685 0 : descs.extend(connected_computes.desc().into_iter().cloned());
686 :
687 0 : let disk_usage = GenericGaugeVec::new(
688 0 : Opts::new(
689 : "safekeeper_disk_usage_bytes",
690 : "Estimated disk space used to store WAL segments",
691 : ),
692 0 : &["tenant_id", "timeline_id"],
693 : )
694 0 : .unwrap();
695 0 : descs.extend(disk_usage.desc().into_iter().cloned());
696 :
697 0 : let acceptor_term = GenericGaugeVec::new(
698 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
699 0 : &["tenant_id", "timeline_id"],
700 : )
701 0 : .unwrap();
702 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
703 :
704 0 : let written_wal_bytes = GenericGaugeVec::new(
705 0 : Opts::new(
706 : "safekeeper_written_wal_bytes_total",
707 : "Number of WAL bytes written to disk, grouped by timeline",
708 : ),
709 0 : &["tenant_id", "timeline_id"],
710 : )
711 0 : .unwrap();
712 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
713 :
714 0 : let written_wal_seconds = GaugeVec::new(
715 0 : Opts::new(
716 : "safekeeper_written_wal_seconds_total",
717 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
718 : ),
719 0 : &["tenant_id", "timeline_id"],
720 : )
721 0 : .unwrap();
722 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
723 :
724 0 : let flushed_wal_seconds = GaugeVec::new(
725 0 : Opts::new(
726 : "safekeeper_flushed_wal_seconds_total",
727 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
728 : ),
729 0 : &["tenant_id", "timeline_id"],
730 : )
731 0 : .unwrap();
732 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
733 :
734 0 : let collect_timeline_metrics = Gauge::new(
735 : "safekeeper_collect_timeline_metrics_seconds",
736 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
737 : )
738 0 : .unwrap();
739 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
740 :
741 0 : let timelines_count = IntGauge::new(
742 : "safekeeper_timelines",
743 : "Total number of timelines loaded in-memory",
744 : )
745 0 : .unwrap();
746 0 : descs.extend(timelines_count.desc().into_iter().cloned());
747 :
748 0 : let active_timelines_count = IntGauge::new(
749 : "safekeeper_active_timelines",
750 : "Total number of active timelines",
751 : )
752 0 : .unwrap();
753 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
754 :
755 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
756 0 : Opts::new(
757 : "safekeeper_interpreted_wal_reader_tasks",
758 : "Number of active interpreted wal reader tasks, grouped by timeline",
759 : ),
760 0 : &["tenant_id", "timeline_id"],
761 : )
762 0 : .unwrap();
763 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
764 :
765 0 : TimelineCollector {
766 0 : global_timelines,
767 0 : descs,
768 0 : commit_lsn,
769 0 : backup_lsn,
770 0 : flush_lsn,
771 0 : epoch_start_lsn,
772 0 : peer_horizon_lsn,
773 0 : remote_consistent_lsn,
774 0 : ps_last_received_lsn,
775 0 : feedback_last_time_seconds,
776 0 : ps_feedback_count,
777 0 : timeline_active,
778 0 : wal_backup_active,
779 0 : connected_computes,
780 0 : disk_usage,
781 0 : acceptor_term,
782 0 : written_wal_bytes,
783 0 : written_wal_seconds,
784 0 : flushed_wal_seconds,
785 0 : collect_timeline_metrics,
786 0 : timelines_count,
787 0 : active_timelines_count,
788 0 : interpreted_wal_reader_tasks,
789 0 : }
790 0 : }
791 : }
792 :
793 : impl Collector for TimelineCollector {
794 0 : fn desc(&self) -> Vec<&Desc> {
795 0 : self.descs.iter().collect()
796 0 : }
797 :
798 0 : fn collect(&self) -> Vec<MetricFamily> {
799 0 : let start_collecting = Instant::now();
800 :
801 : // reset all metrics to clean up inactive timelines
802 0 : self.commit_lsn.reset();
803 0 : self.backup_lsn.reset();
804 0 : self.flush_lsn.reset();
805 0 : self.epoch_start_lsn.reset();
806 0 : self.peer_horizon_lsn.reset();
807 0 : self.remote_consistent_lsn.reset();
808 0 : self.ps_last_received_lsn.reset();
809 0 : self.feedback_last_time_seconds.reset();
810 0 : self.ps_feedback_count.reset();
811 0 : self.timeline_active.reset();
812 0 : self.wal_backup_active.reset();
813 0 : self.connected_computes.reset();
814 0 : self.disk_usage.reset();
815 0 : self.acceptor_term.reset();
816 0 : self.written_wal_bytes.reset();
817 0 : self.interpreted_wal_reader_tasks.reset();
818 0 : self.written_wal_seconds.reset();
819 0 : self.flushed_wal_seconds.reset();
820 :
821 0 : let timelines_count = self.global_timelines.get_all().len();
822 0 : let mut active_timelines_count = 0;
823 :
824 : // Prometheus Collector is sync, and data is stored under async lock. To
825 : // bridge the gap with a crutch, collect data in spawned thread with
826 : // local tokio runtime.
827 0 : let global_timelines = self.global_timelines.clone();
828 0 : let infos = std::thread::spawn(|| {
829 0 : let rt = tokio::runtime::Builder::new_current_thread()
830 0 : .build()
831 0 : .expect("failed to create rt");
832 0 : rt.block_on(collect_timeline_metrics(global_timelines))
833 0 : })
834 0 : .join()
835 0 : .expect("collect_timeline_metrics thread panicked");
836 :
837 0 : for tli in &infos {
838 0 : let tenant_id = tli.ttid.tenant_id.to_string();
839 0 : let timeline_id = tli.ttid.timeline_id.to_string();
840 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
841 :
842 0 : if tli.timeline_is_active {
843 0 : active_timelines_count += 1;
844 0 : }
845 :
846 0 : self.commit_lsn
847 0 : .with_label_values(labels)
848 0 : .set(tli.mem_state.commit_lsn.into());
849 0 : self.backup_lsn
850 0 : .with_label_values(labels)
851 0 : .set(tli.mem_state.backup_lsn.into());
852 0 : self.flush_lsn
853 0 : .with_label_values(labels)
854 0 : .set(tli.flush_lsn.into());
855 0 : self.epoch_start_lsn
856 0 : .with_label_values(labels)
857 0 : .set(tli.epoch_start_lsn.into());
858 0 : self.peer_horizon_lsn
859 0 : .with_label_values(labels)
860 0 : .set(tli.mem_state.peer_horizon_lsn.into());
861 0 : self.remote_consistent_lsn
862 0 : .with_label_values(labels)
863 0 : .set(tli.mem_state.remote_consistent_lsn.into());
864 0 : self.timeline_active
865 0 : .with_label_values(labels)
866 0 : .set(tli.timeline_is_active as u64);
867 0 : self.wal_backup_active
868 0 : .with_label_values(labels)
869 0 : .set(tli.wal_backup_active as u64);
870 0 : self.connected_computes
871 0 : .with_label_values(labels)
872 0 : .set(tli.num_computes as i64);
873 0 : self.acceptor_term
874 0 : .with_label_values(labels)
875 0 : .set(tli.persisted_state.acceptor_state.term);
876 0 : self.written_wal_bytes
877 0 : .with_label_values(labels)
878 0 : .set(tli.wal_storage.write_wal_bytes);
879 0 : self.interpreted_wal_reader_tasks
880 0 : .with_label_values(labels)
881 0 : .set(tli.interpreted_wal_reader_tasks as u64);
882 0 : self.written_wal_seconds
883 0 : .with_label_values(labels)
884 0 : .set(tli.wal_storage.write_wal_seconds);
885 0 : self.flushed_wal_seconds
886 0 : .with_label_values(labels)
887 0 : .set(tli.wal_storage.flush_wal_seconds);
888 :
889 0 : self.ps_last_received_lsn
890 0 : .with_label_values(labels)
891 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
892 0 : self.ps_feedback_count
893 0 : .with_label_values(labels)
894 0 : .set(tli.ps_feedback_count);
895 0 : if let Ok(unix_time) = tli
896 0 : .last_ps_feedback
897 0 : .replytime
898 0 : .duration_since(SystemTime::UNIX_EPOCH)
899 0 : {
900 0 : self.feedback_last_time_seconds
901 0 : .with_label_values(labels)
902 0 : .set(unix_time.as_secs());
903 0 : }
904 :
905 0 : if tli.last_removed_segno != 0 {
906 0 : let segno_count = tli
907 0 : .flush_lsn
908 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
909 0 : - tli.last_removed_segno;
910 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
911 0 : self.disk_usage
912 0 : .with_label_values(labels)
913 0 : .set(disk_usage_bytes);
914 0 : }
915 : }
916 :
917 : // collect MetricFamilys.
918 0 : let mut mfs = Vec::new();
919 0 : mfs.extend(self.commit_lsn.collect());
920 0 : mfs.extend(self.backup_lsn.collect());
921 0 : mfs.extend(self.flush_lsn.collect());
922 0 : mfs.extend(self.epoch_start_lsn.collect());
923 0 : mfs.extend(self.peer_horizon_lsn.collect());
924 0 : mfs.extend(self.remote_consistent_lsn.collect());
925 0 : mfs.extend(self.ps_last_received_lsn.collect());
926 0 : mfs.extend(self.feedback_last_time_seconds.collect());
927 0 : mfs.extend(self.ps_feedback_count.collect());
928 0 : mfs.extend(self.timeline_active.collect());
929 0 : mfs.extend(self.wal_backup_active.collect());
930 0 : mfs.extend(self.connected_computes.collect());
931 0 : mfs.extend(self.disk_usage.collect());
932 0 : mfs.extend(self.acceptor_term.collect());
933 0 : mfs.extend(self.written_wal_bytes.collect());
934 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
935 0 : mfs.extend(self.written_wal_seconds.collect());
936 0 : mfs.extend(self.flushed_wal_seconds.collect());
937 :
938 : // report time it took to collect all info
939 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
940 0 : self.collect_timeline_metrics.set(elapsed);
941 0 : mfs.extend(self.collect_timeline_metrics.collect());
942 :
943 : // report total number of timelines
944 0 : self.timelines_count.set(timelines_count as i64);
945 0 : mfs.extend(self.timelines_count.collect());
946 :
947 0 : self.active_timelines_count
948 0 : .set(active_timelines_count as i64);
949 0 : mfs.extend(self.active_timelines_count.collect());
950 :
951 0 : mfs
952 0 : }
953 : }
954 :
955 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
956 0 : let mut res = vec![];
957 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
958 :
959 0 : for tli in active_timelines {
960 0 : if let Some(info) = tli.info_for_metrics().await {
961 0 : res.push(info);
962 0 : }
963 : }
964 0 : res
965 0 : }
|