Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::sync::{Arc, RwLock};
4 : use std::time::{Instant, SystemTime};
5 :
6 : use anyhow::Result;
7 : use futures::Future;
8 : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
9 : use metrics::proto::MetricFamily;
10 : use metrics::{
11 : DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
12 : IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
13 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
14 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
15 : register_int_gauge_vec,
16 : };
17 : use once_cell::sync::Lazy;
18 : use postgres_ffi::XLogSegNo;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 :
23 : use crate::GlobalTimelines;
24 : use crate::receive_wal::MSG_QUEUE_SIZE;
25 : use crate::state::{TimelineMemState, TimelinePersistentState};
26 :
27 : // Global metrics across all timelines.
28 5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 5 : register_histogram!(
30 : "safekeeper_write_wal_bytes",
31 : "Bytes written to WAL in a single request",
32 5 : vec![
33 : 1.0,
34 : 10.0,
35 : 100.0,
36 : 1024.0,
37 : 8192.0,
38 5 : 128.0 * 1024.0,
39 5 : 1024.0 * 1024.0,
40 5 : 10.0 * 1024.0 * 1024.0
41 : ]
42 : )
43 5 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 5 : });
45 5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 5 : register_histogram!(
47 : "safekeeper_write_wal_seconds",
48 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
50 : )
51 5 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 5 : });
53 5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 5 : register_histogram!(
55 : "safekeeper_flush_wal_seconds",
56 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
57 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
58 : )
59 5 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 5 : });
61 : /* BEGIN_HADRON */
62 0 : pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
63 0 : register_int_counter!(
64 : "safekeeper_wal_disk_io_errors",
65 : "Number of disk I/O errors when creating and flushing WALs and control files"
66 : )
67 0 : .expect("Failed to register safekeeper_wal_disk_io_errors counter")
68 0 : });
69 0 : pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
70 0 : register_int_counter!(
71 : "safekeeper_wal_storage_limit_errors",
72 : concat!(
73 : "Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
74 : "An increase in this metric indicates issues backing up or removing WALs."
75 : )
76 : )
77 0 : .expect("Failed to register safekeeper_wal_storage_limit_errors counter")
78 0 : });
79 : /* END_HADRON */
80 7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
81 7 : register_histogram!(
82 : "safekeeper_persist_control_file_seconds",
83 : "Seconds to persist and sync control file",
84 7 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
85 : )
86 7 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
87 7 : });
88 5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
89 5 : register_histogram_vec!(
90 : "safekeeper_wal_storage_operation_seconds",
91 : "Seconds spent on WAL storage operations",
92 5 : &["operation"],
93 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
94 : )
95 5 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
96 5 : });
97 15 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
98 15 : register_histogram_vec!(
99 : "safekeeper_misc_operation_seconds",
100 : "Seconds spent on miscellaneous operations",
101 15 : &["operation"],
102 15 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
103 : )
104 15 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
105 15 : });
106 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
107 0 : register_int_counter_vec!(
108 : "safekeeper_pg_io_bytes_total",
109 : "Bytes read from or written to any PostgreSQL connection",
110 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
111 : )
112 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
113 0 : });
114 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
115 0 : register_int_counter!(
116 : "safekeeper_broker_pushed_updates_total",
117 : "Number of timeline updates pushed to the broker"
118 : )
119 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
120 0 : });
121 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
122 0 : register_int_counter_vec!(
123 : "safekeeper_broker_pulled_updates_total",
124 : "Number of timeline updates pulled and processed from the broker",
125 0 : &["result"]
126 : )
127 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
128 0 : });
129 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
130 0 : register_int_counter_pair_vec!(
131 : "safekeeper_pg_queries_received_total",
132 : "Number of queries received through pg protocol",
133 : "safekeeper_pg_queries_finished_total",
134 : "Number of queries finished through pg protocol",
135 0 : &["query"]
136 : )
137 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
138 0 : });
139 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
140 0 : register_int_counter!(
141 : "safekeeper_removed_wal_segments_total",
142 : "Number of WAL segments removed from the disk"
143 : )
144 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
145 0 : });
146 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
147 0 : register_int_counter!(
148 : "safekeeper_backed_up_segments_total",
149 : "Number of WAL segments backed up to the S3"
150 : )
151 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
152 0 : });
153 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
154 0 : register_int_counter!(
155 : "safekeeper_backup_errors_total",
156 : "Number of errors during backup"
157 : )
158 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
159 0 : });
160 : /* BEGIN_HADRON */
161 0 : pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
162 0 : register_int_counter!(
163 : "safekeeper_backup_reelect_leader_total",
164 : "Number of times the backup leader was reelected"
165 : )
166 0 : .expect("Failed to register safekeeper_backup_reelect_leader_total counter")
167 0 : });
168 : /* END_HADRON */
169 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
170 0 : register_histogram!(
171 : "safekeeper_broker_push_update_seconds",
172 : "Seconds to push all timeline updates to the broker",
173 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
174 : )
175 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
176 0 : });
177 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
178 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
179 : ];
180 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
181 0 : register_histogram!(
182 : "safekeeper_broker_iteration_timelines",
183 : "Count of timelines pushed to the broker in a single iteration",
184 0 : TIMELINES_COUNT_BUCKETS.to_vec()
185 : )
186 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
187 0 : });
188 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
189 0 : register_int_counter!(
190 : "safekeeper_received_ps_feedbacks_total",
191 : "Number of pageserver feedbacks received"
192 : )
193 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
194 0 : });
195 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
196 0 : register_int_counter_vec!(
197 : "safekeeper_partial_backup_uploads_total",
198 : "Number of partial backup uploads to the S3",
199 0 : &["result"]
200 : )
201 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
202 0 : });
203 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
204 0 : register_int_counter!(
205 : "safekeeper_partial_backup_uploaded_bytes_total",
206 : "Number of bytes uploaded to the S3 during partial backup"
207 : )
208 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
209 0 : });
210 5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
211 5 : register_int_counter!(
212 : "safekeeper_manager_iterations_total",
213 : "Number of iterations of the timeline manager task"
214 : )
215 5 : .expect("Failed to register safekeeper_manager_iterations_total counter")
216 5 : });
217 5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
218 5 : register_int_counter!(
219 : "safekeeper_manager_active_changes_total",
220 : "Number of timeline active status changes in the timeline manager task"
221 : )
222 5 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
223 5 : });
224 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
225 0 : register_int_counter_pair!(
226 : "safekeeper_wal_backup_tasks_started_total",
227 : "Number of active WAL backup tasks",
228 : "safekeeper_wal_backup_tasks_finished_total",
229 : "Number of finished WAL backup tasks",
230 : )
231 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
232 0 : });
233 5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
234 5 : register_int_gauge!(
235 : "safekeeper_wal_receivers",
236 : "Number of currently connected WAL receivers (i.e. connected computes)"
237 : )
238 5 : .expect("Failed to register safekeeper_wal_receivers")
239 5 : });
240 4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
241 4 : register_int_gauge_vec!(
242 : "safekeeper_wal_readers",
243 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
244 4 : &["kind", "target"]
245 : )
246 4 : .expect("Failed to register safekeeper_wal_receivers")
247 4 : });
248 5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
249 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
250 : // full queues respectively.
251 5 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
252 5 : buckets.insert(0, 0.0);
253 5 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
254 5 : assert!(buckets.len() <= 12, "too many histogram buckets");
255 :
256 5 : register_histogram!(
257 : "safekeeper_wal_receiver_queue_depth",
258 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
259 5 : buckets
260 : )
261 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
262 5 : });
263 5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
264 5 : register_int_gauge!(
265 : "safekeeper_wal_receiver_queue_depth_total",
266 : "Total number of queued messages across all WAL receivers",
267 : )
268 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
269 5 : });
270 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
271 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
272 5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
273 5 : register_int_gauge!(
274 : "safekeeper_wal_receiver_queue_size_total",
275 : "Total memory byte size of queued messages across all WAL receivers",
276 : )
277 5 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
278 5 : });
279 :
280 : // Metrics collected on operations on the storage repository.
281 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
282 : #[strum(serialize_all = "kebab_case")]
283 : pub(crate) enum EvictionEvent {
284 : Evict,
285 : Restore,
286 : }
287 :
288 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
289 0 : register_int_counter_vec!(
290 : "safekeeper_eviction_events_started_total",
291 : "Number of eviction state changes, incremented when they start",
292 0 : &["kind"]
293 : )
294 0 : .expect("Failed to register metric")
295 0 : });
296 :
297 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
298 0 : register_int_counter_vec!(
299 : "safekeeper_eviction_events_completed_total",
300 : "Number of eviction state changes, incremented when they complete",
301 0 : &["kind"]
302 : )
303 0 : .expect("Failed to register metric")
304 0 : });
305 :
306 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
307 0 : register_int_gauge!(
308 : "safekeeper_evicted_timelines",
309 : "Number of currently evicted timelines"
310 : )
311 0 : .expect("Failed to register metric")
312 0 : });
313 :
314 : pub const LABEL_UNKNOWN: &str = "unknown";
315 :
316 : /// Labels for traffic metrics.
317 : #[derive(Clone)]
318 : struct ConnectionLabels {
319 : /// Availability zone of the connection origin.
320 : client_az: String,
321 : /// Availability zone of the current safekeeper.
322 : sk_az: String,
323 : /// Client application name.
324 : app_name: String,
325 : }
326 :
327 : impl ConnectionLabels {
328 0 : fn new() -> Self {
329 0 : Self {
330 0 : client_az: LABEL_UNKNOWN.to_string(),
331 0 : sk_az: LABEL_UNKNOWN.to_string(),
332 0 : app_name: LABEL_UNKNOWN.to_string(),
333 0 : }
334 0 : }
335 :
336 0 : fn build_metrics(
337 0 : &self,
338 0 : ) -> (
339 0 : GenericCounter<metrics::core::AtomicU64>,
340 0 : GenericCounter<metrics::core::AtomicU64>,
341 0 : ) {
342 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
343 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
344 0 : (client_az, sk_az) => {
345 0 : if client_az == sk_az {
346 0 : "true"
347 : } else {
348 0 : "false"
349 : }
350 : }
351 : };
352 :
353 0 : let read = PG_IO_BYTES.with_label_values(&[
354 0 : &self.client_az,
355 0 : &self.sk_az,
356 0 : &self.app_name,
357 0 : "read",
358 0 : same_az,
359 0 : ]);
360 0 : let write = PG_IO_BYTES.with_label_values(&[
361 0 : &self.client_az,
362 0 : &self.sk_az,
363 0 : &self.app_name,
364 0 : "write",
365 0 : same_az,
366 0 : ]);
367 0 : (read, write)
368 0 : }
369 : }
370 :
371 : struct TrafficMetricsState {
372 : /// Labels for traffic metrics.
373 : labels: ConnectionLabels,
374 : /// Total bytes read from this connection.
375 : read: GenericCounter<metrics::core::AtomicU64>,
376 : /// Total bytes written to this connection.
377 : write: GenericCounter<metrics::core::AtomicU64>,
378 : }
379 :
380 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
381 : #[derive(Clone)]
382 : pub struct TrafficMetrics {
383 : state: Arc<RwLock<TrafficMetricsState>>,
384 : }
385 :
386 : impl Default for TrafficMetrics {
387 0 : fn default() -> Self {
388 0 : Self::new()
389 0 : }
390 : }
391 :
392 : impl TrafficMetrics {
393 0 : pub fn new() -> Self {
394 0 : let labels = ConnectionLabels::new();
395 0 : let (read, write) = labels.build_metrics();
396 0 : let state = TrafficMetricsState {
397 0 : labels,
398 0 : read,
399 0 : write,
400 0 : };
401 0 : Self {
402 0 : state: Arc::new(RwLock::new(state)),
403 0 : }
404 0 : }
405 :
406 0 : pub fn set_client_az(&self, value: &str) {
407 0 : let mut state = self.state.write().unwrap();
408 0 : state.labels.client_az = value.to_string();
409 0 : (state.read, state.write) = state.labels.build_metrics();
410 0 : }
411 :
412 0 : pub fn set_sk_az(&self, value: &str) {
413 0 : let mut state = self.state.write().unwrap();
414 0 : state.labels.sk_az = value.to_string();
415 0 : (state.read, state.write) = state.labels.build_metrics();
416 0 : }
417 :
418 0 : pub fn set_app_name(&self, value: &str) {
419 0 : let mut state = self.state.write().unwrap();
420 0 : state.labels.app_name = value.to_string();
421 0 : (state.read, state.write) = state.labels.build_metrics();
422 0 : }
423 :
424 0 : pub fn observe_read(&self, cnt: usize) {
425 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
426 0 : }
427 :
428 0 : pub fn observe_write(&self, cnt: usize) {
429 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
430 0 : }
431 : }
432 :
433 : /// Metrics for WalStorage in a single timeline.
434 : #[derive(Clone, Default)]
435 : pub struct WalStorageMetrics {
436 : /// How much bytes were written in total.
437 : write_wal_bytes: u64,
438 : /// How much time spent writing WAL to disk, waiting for write(2).
439 : write_wal_seconds: f64,
440 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
441 : flush_wal_seconds: f64,
442 : }
443 :
444 : impl WalStorageMetrics {
445 620 : pub fn observe_write_bytes(&mut self, bytes: usize) {
446 620 : self.write_wal_bytes += bytes as u64;
447 620 : WRITE_WAL_BYTES.observe(bytes as f64);
448 620 : }
449 :
450 620 : pub fn observe_write_seconds(&mut self, seconds: f64) {
451 620 : self.write_wal_seconds += seconds;
452 620 : WRITE_WAL_SECONDS.observe(seconds);
453 620 : }
454 :
455 625 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
456 625 : self.flush_wal_seconds += seconds;
457 625 : FLUSH_WAL_SECONDS.observe(seconds);
458 625 : }
459 : }
460 :
461 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
462 1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
463 1245 : closure: impl Future<Output = Result<(), E>>,
464 1245 : ) -> Result<f64> {
465 1245 : let start = std::time::Instant::now();
466 1245 : closure.await.map_err(|e| e.into())?;
467 1245 : Ok(start.elapsed().as_secs_f64())
468 1245 : }
469 :
470 : /// Metrics for a single timeline.
471 : #[derive(Clone)]
472 : pub struct FullTimelineInfo {
473 : pub ttid: TenantTimelineId,
474 : pub ps_feedback_count: u64,
475 : pub last_ps_feedback: PageserverFeedback,
476 : pub wal_backup_active: bool,
477 : pub timeline_is_active: bool,
478 : pub num_computes: u32,
479 : pub last_removed_segno: XLogSegNo,
480 : pub interpreted_wal_reader_tasks: usize,
481 :
482 : pub epoch_start_lsn: Lsn,
483 : pub mem_state: TimelineMemState,
484 : pub persisted_state: TimelinePersistentState,
485 :
486 : pub flush_lsn: Lsn,
487 :
488 : pub wal_storage: WalStorageMetrics,
489 : }
490 :
491 : /// Collects metrics for all active timelines.
492 : pub struct TimelineCollector {
493 : global_timelines: Arc<GlobalTimelines>,
494 : descs: Vec<Desc>,
495 : commit_lsn: GenericGaugeVec<AtomicU64>,
496 : backup_lsn: GenericGaugeVec<AtomicU64>,
497 : flush_lsn: GenericGaugeVec<AtomicU64>,
498 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
499 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
500 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
501 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
502 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
503 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
504 : timeline_active: GenericGaugeVec<AtomicU64>,
505 : wal_backup_active: GenericGaugeVec<AtomicU64>,
506 : connected_computes: IntGaugeVec,
507 : disk_usage: GenericGaugeVec<AtomicU64>,
508 : acceptor_term: GenericGaugeVec<AtomicU64>,
509 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
510 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
511 : written_wal_seconds: GaugeVec,
512 : flushed_wal_seconds: GaugeVec,
513 : collect_timeline_metrics: Gauge,
514 : timelines_count: IntGauge,
515 : active_timelines_count: IntGauge,
516 : }
517 :
518 : impl TimelineCollector {
519 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
520 0 : let mut descs = Vec::new();
521 :
522 0 : let commit_lsn = GenericGaugeVec::new(
523 0 : Opts::new(
524 : "safekeeper_commit_lsn",
525 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
526 : ),
527 0 : &["tenant_id", "timeline_id"],
528 : )
529 0 : .unwrap();
530 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
531 :
532 0 : let backup_lsn = GenericGaugeVec::new(
533 0 : Opts::new(
534 : "safekeeper_backup_lsn",
535 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
536 : ),
537 0 : &["tenant_id", "timeline_id"],
538 : )
539 0 : .unwrap();
540 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
541 :
542 0 : let flush_lsn = GenericGaugeVec::new(
543 0 : Opts::new(
544 : "safekeeper_flush_lsn",
545 : "Current flush_lsn, grouped by timeline",
546 : ),
547 0 : &["tenant_id", "timeline_id"],
548 : )
549 0 : .unwrap();
550 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
551 :
552 0 : let epoch_start_lsn = GenericGaugeVec::new(
553 0 : Opts::new(
554 : "safekeeper_epoch_start_lsn",
555 : "Point since which compute generates new WAL in the current consensus term",
556 : ),
557 0 : &["tenant_id", "timeline_id"],
558 : )
559 0 : .unwrap();
560 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
561 :
562 0 : let peer_horizon_lsn = GenericGaugeVec::new(
563 0 : Opts::new(
564 : "safekeeper_peer_horizon_lsn",
565 : "LSN of the most lagging safekeeper",
566 : ),
567 0 : &["tenant_id", "timeline_id"],
568 : )
569 0 : .unwrap();
570 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
571 :
572 0 : let remote_consistent_lsn = GenericGaugeVec::new(
573 0 : Opts::new(
574 : "safekeeper_remote_consistent_lsn",
575 : "LSN which is persisted to the remote storage in pageserver",
576 : ),
577 0 : &["tenant_id", "timeline_id"],
578 : )
579 0 : .unwrap();
580 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
581 :
582 0 : let ps_last_received_lsn = GenericGaugeVec::new(
583 0 : Opts::new(
584 : "safekeeper_ps_last_received_lsn",
585 : "Last LSN received by the pageserver, acknowledged in the feedback",
586 : ),
587 0 : &["tenant_id", "timeline_id"],
588 : )
589 0 : .unwrap();
590 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
591 :
592 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
593 0 : Opts::new(
594 : "safekeeper_feedback_last_time_seconds",
595 : "Timestamp of the last feedback from the pageserver",
596 : ),
597 0 : &["tenant_id", "timeline_id"],
598 : )
599 0 : .unwrap();
600 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
601 :
602 0 : let ps_feedback_count = GenericGaugeVec::new(
603 0 : Opts::new(
604 : "safekeeper_ps_feedback_count_total",
605 : "Number of feedbacks received from the pageserver",
606 : ),
607 0 : &["tenant_id", "timeline_id"],
608 : )
609 0 : .unwrap();
610 :
611 0 : let timeline_active = GenericGaugeVec::new(
612 0 : Opts::new(
613 : "safekeeper_timeline_active",
614 : "Reports 1 for active timelines, 0 for inactive",
615 : ),
616 0 : &["tenant_id", "timeline_id"],
617 : )
618 0 : .unwrap();
619 0 : descs.extend(timeline_active.desc().into_iter().cloned());
620 :
621 0 : let wal_backup_active = GenericGaugeVec::new(
622 0 : Opts::new(
623 : "safekeeper_wal_backup_active",
624 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
625 : ),
626 0 : &["tenant_id", "timeline_id"],
627 : )
628 0 : .unwrap();
629 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
630 :
631 0 : let connected_computes = IntGaugeVec::new(
632 0 : Opts::new(
633 : "safekeeper_connected_computes",
634 : "Number of active compute connections",
635 : ),
636 0 : &["tenant_id", "timeline_id"],
637 : )
638 0 : .unwrap();
639 0 : descs.extend(connected_computes.desc().into_iter().cloned());
640 :
641 0 : let disk_usage = GenericGaugeVec::new(
642 0 : Opts::new(
643 : "safekeeper_disk_usage_bytes",
644 : "Estimated disk space used to store WAL segments",
645 : ),
646 0 : &["tenant_id", "timeline_id"],
647 : )
648 0 : .unwrap();
649 0 : descs.extend(disk_usage.desc().into_iter().cloned());
650 :
651 0 : let acceptor_term = GenericGaugeVec::new(
652 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
653 0 : &["tenant_id", "timeline_id"],
654 : )
655 0 : .unwrap();
656 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
657 :
658 0 : let written_wal_bytes = GenericGaugeVec::new(
659 0 : Opts::new(
660 : "safekeeper_written_wal_bytes_total",
661 : "Number of WAL bytes written to disk, grouped by timeline",
662 : ),
663 0 : &["tenant_id", "timeline_id"],
664 : )
665 0 : .unwrap();
666 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
667 :
668 0 : let written_wal_seconds = GaugeVec::new(
669 0 : Opts::new(
670 : "safekeeper_written_wal_seconds_total",
671 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
672 : ),
673 0 : &["tenant_id", "timeline_id"],
674 : )
675 0 : .unwrap();
676 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
677 :
678 0 : let flushed_wal_seconds = GaugeVec::new(
679 0 : Opts::new(
680 : "safekeeper_flushed_wal_seconds_total",
681 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
682 : ),
683 0 : &["tenant_id", "timeline_id"],
684 : )
685 0 : .unwrap();
686 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
687 :
688 0 : let collect_timeline_metrics = Gauge::new(
689 : "safekeeper_collect_timeline_metrics_seconds",
690 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
691 : )
692 0 : .unwrap();
693 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
694 :
695 0 : let timelines_count = IntGauge::new(
696 : "safekeeper_timelines",
697 : "Total number of timelines loaded in-memory",
698 : )
699 0 : .unwrap();
700 0 : descs.extend(timelines_count.desc().into_iter().cloned());
701 :
702 0 : let active_timelines_count = IntGauge::new(
703 : "safekeeper_active_timelines",
704 : "Total number of active timelines",
705 : )
706 0 : .unwrap();
707 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
708 :
709 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
710 0 : Opts::new(
711 : "safekeeper_interpreted_wal_reader_tasks",
712 : "Number of active interpreted wal reader tasks, grouped by timeline",
713 : ),
714 0 : &["tenant_id", "timeline_id"],
715 : )
716 0 : .unwrap();
717 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
718 :
719 0 : TimelineCollector {
720 0 : global_timelines,
721 0 : descs,
722 0 : commit_lsn,
723 0 : backup_lsn,
724 0 : flush_lsn,
725 0 : epoch_start_lsn,
726 0 : peer_horizon_lsn,
727 0 : remote_consistent_lsn,
728 0 : ps_last_received_lsn,
729 0 : feedback_last_time_seconds,
730 0 : ps_feedback_count,
731 0 : timeline_active,
732 0 : wal_backup_active,
733 0 : connected_computes,
734 0 : disk_usage,
735 0 : acceptor_term,
736 0 : written_wal_bytes,
737 0 : written_wal_seconds,
738 0 : flushed_wal_seconds,
739 0 : collect_timeline_metrics,
740 0 : timelines_count,
741 0 : active_timelines_count,
742 0 : interpreted_wal_reader_tasks,
743 0 : }
744 0 : }
745 : }
746 :
747 : impl Collector for TimelineCollector {
748 0 : fn desc(&self) -> Vec<&Desc> {
749 0 : self.descs.iter().collect()
750 0 : }
751 :
752 0 : fn collect(&self) -> Vec<MetricFamily> {
753 0 : let start_collecting = Instant::now();
754 :
755 : // reset all metrics to clean up inactive timelines
756 0 : self.commit_lsn.reset();
757 0 : self.backup_lsn.reset();
758 0 : self.flush_lsn.reset();
759 0 : self.epoch_start_lsn.reset();
760 0 : self.peer_horizon_lsn.reset();
761 0 : self.remote_consistent_lsn.reset();
762 0 : self.ps_last_received_lsn.reset();
763 0 : self.feedback_last_time_seconds.reset();
764 0 : self.ps_feedback_count.reset();
765 0 : self.timeline_active.reset();
766 0 : self.wal_backup_active.reset();
767 0 : self.connected_computes.reset();
768 0 : self.disk_usage.reset();
769 0 : self.acceptor_term.reset();
770 0 : self.written_wal_bytes.reset();
771 0 : self.interpreted_wal_reader_tasks.reset();
772 0 : self.written_wal_seconds.reset();
773 0 : self.flushed_wal_seconds.reset();
774 :
775 0 : let timelines_count = self.global_timelines.get_all().len();
776 0 : let mut active_timelines_count = 0;
777 :
778 : // Prometheus Collector is sync, and data is stored under async lock. To
779 : // bridge the gap with a crutch, collect data in spawned thread with
780 : // local tokio runtime.
781 0 : let global_timelines = self.global_timelines.clone();
782 0 : let infos = std::thread::spawn(|| {
783 0 : let rt = tokio::runtime::Builder::new_current_thread()
784 0 : .build()
785 0 : .expect("failed to create rt");
786 0 : rt.block_on(collect_timeline_metrics(global_timelines))
787 0 : })
788 0 : .join()
789 0 : .expect("collect_timeline_metrics thread panicked");
790 :
791 0 : for tli in &infos {
792 0 : let tenant_id = tli.ttid.tenant_id.to_string();
793 0 : let timeline_id = tli.ttid.timeline_id.to_string();
794 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
795 :
796 0 : if tli.timeline_is_active {
797 0 : active_timelines_count += 1;
798 0 : }
799 :
800 0 : self.commit_lsn
801 0 : .with_label_values(labels)
802 0 : .set(tli.mem_state.commit_lsn.into());
803 0 : self.backup_lsn
804 0 : .with_label_values(labels)
805 0 : .set(tli.mem_state.backup_lsn.into());
806 0 : self.flush_lsn
807 0 : .with_label_values(labels)
808 0 : .set(tli.flush_lsn.into());
809 0 : self.epoch_start_lsn
810 0 : .with_label_values(labels)
811 0 : .set(tli.epoch_start_lsn.into());
812 0 : self.peer_horizon_lsn
813 0 : .with_label_values(labels)
814 0 : .set(tli.mem_state.peer_horizon_lsn.into());
815 0 : self.remote_consistent_lsn
816 0 : .with_label_values(labels)
817 0 : .set(tli.mem_state.remote_consistent_lsn.into());
818 0 : self.timeline_active
819 0 : .with_label_values(labels)
820 0 : .set(tli.timeline_is_active as u64);
821 0 : self.wal_backup_active
822 0 : .with_label_values(labels)
823 0 : .set(tli.wal_backup_active as u64);
824 0 : self.connected_computes
825 0 : .with_label_values(labels)
826 0 : .set(tli.num_computes as i64);
827 0 : self.acceptor_term
828 0 : .with_label_values(labels)
829 0 : .set(tli.persisted_state.acceptor_state.term);
830 0 : self.written_wal_bytes
831 0 : .with_label_values(labels)
832 0 : .set(tli.wal_storage.write_wal_bytes);
833 0 : self.interpreted_wal_reader_tasks
834 0 : .with_label_values(labels)
835 0 : .set(tli.interpreted_wal_reader_tasks as u64);
836 0 : self.written_wal_seconds
837 0 : .with_label_values(labels)
838 0 : .set(tli.wal_storage.write_wal_seconds);
839 0 : self.flushed_wal_seconds
840 0 : .with_label_values(labels)
841 0 : .set(tli.wal_storage.flush_wal_seconds);
842 :
843 0 : self.ps_last_received_lsn
844 0 : .with_label_values(labels)
845 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
846 0 : self.ps_feedback_count
847 0 : .with_label_values(labels)
848 0 : .set(tli.ps_feedback_count);
849 0 : if let Ok(unix_time) = tli
850 0 : .last_ps_feedback
851 0 : .replytime
852 0 : .duration_since(SystemTime::UNIX_EPOCH)
853 0 : {
854 0 : self.feedback_last_time_seconds
855 0 : .with_label_values(labels)
856 0 : .set(unix_time.as_secs());
857 0 : }
858 :
859 0 : if tli.last_removed_segno != 0 {
860 0 : let segno_count = tli
861 0 : .flush_lsn
862 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
863 0 : - tli.last_removed_segno;
864 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
865 0 : self.disk_usage
866 0 : .with_label_values(labels)
867 0 : .set(disk_usage_bytes);
868 0 : }
869 : }
870 :
871 : // collect MetricFamilys.
872 0 : let mut mfs = Vec::new();
873 0 : mfs.extend(self.commit_lsn.collect());
874 0 : mfs.extend(self.backup_lsn.collect());
875 0 : mfs.extend(self.flush_lsn.collect());
876 0 : mfs.extend(self.epoch_start_lsn.collect());
877 0 : mfs.extend(self.peer_horizon_lsn.collect());
878 0 : mfs.extend(self.remote_consistent_lsn.collect());
879 0 : mfs.extend(self.ps_last_received_lsn.collect());
880 0 : mfs.extend(self.feedback_last_time_seconds.collect());
881 0 : mfs.extend(self.ps_feedback_count.collect());
882 0 : mfs.extend(self.timeline_active.collect());
883 0 : mfs.extend(self.wal_backup_active.collect());
884 0 : mfs.extend(self.connected_computes.collect());
885 0 : mfs.extend(self.disk_usage.collect());
886 0 : mfs.extend(self.acceptor_term.collect());
887 0 : mfs.extend(self.written_wal_bytes.collect());
888 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
889 0 : mfs.extend(self.written_wal_seconds.collect());
890 0 : mfs.extend(self.flushed_wal_seconds.collect());
891 :
892 : // report time it took to collect all info
893 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
894 0 : self.collect_timeline_metrics.set(elapsed);
895 0 : mfs.extend(self.collect_timeline_metrics.collect());
896 :
897 : // report total number of timelines
898 0 : self.timelines_count.set(timelines_count as i64);
899 0 : mfs.extend(self.timelines_count.collect());
900 :
901 0 : self.active_timelines_count
902 0 : .set(active_timelines_count as i64);
903 0 : mfs.extend(self.active_timelines_count.collect());
904 :
905 0 : mfs
906 0 : }
907 : }
908 :
909 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
910 0 : let mut res = vec![];
911 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
912 :
913 0 : for tli in active_timelines {
914 0 : if let Some(info) = tli.info_for_metrics().await {
915 0 : res.push(info);
916 0 : }
917 : }
918 0 : res
919 0 : }
|