Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::sync::{Arc, RwLock};
4 : use std::time::{Instant, SystemTime};
5 :
6 : use anyhow::Result;
7 : use futures::Future;
8 : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
9 : use metrics::proto::MetricFamily;
10 : use metrics::{
11 : DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
12 : IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
13 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
14 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
15 : register_int_gauge_vec,
16 : };
17 : use once_cell::sync::Lazy;
18 : use postgres_ffi::XLogSegNo;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 :
23 : use crate::GlobalTimelines;
24 : use crate::receive_wal::MSG_QUEUE_SIZE;
25 : use crate::state::{TimelineMemState, TimelinePersistentState};
26 :
27 : // Global metrics across all timelines.
28 5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 5 : register_histogram!(
30 : "safekeeper_write_wal_bytes",
31 : "Bytes written to WAL in a single request",
32 5 : vec![
33 : 1.0,
34 : 10.0,
35 : 100.0,
36 : 1024.0,
37 : 8192.0,
38 5 : 128.0 * 1024.0,
39 5 : 1024.0 * 1024.0,
40 5 : 10.0 * 1024.0 * 1024.0
41 : ]
42 : )
43 5 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 5 : });
45 5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 5 : register_histogram!(
47 : "safekeeper_write_wal_seconds",
48 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
50 : )
51 5 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 5 : });
53 5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 5 : register_histogram!(
55 : "safekeeper_flush_wal_seconds",
56 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
57 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
58 : )
59 5 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 5 : });
61 : /* BEGIN_HADRON */
62 : // Counter of all ProposerAcceptorMessage requests received
63 17 : pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
64 17 : register_int_counter_vec!(
65 : "safekeeper_proposer_acceptor_messages_total",
66 : "Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
67 17 : &["outcome"]
68 : )
69 17 : .expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
70 17 : });
71 0 : pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
72 0 : register_int_counter!(
73 : "safekeeper_wal_disk_io_errors",
74 : "Number of disk I/O errors when creating and flushing WALs and control files"
75 : )
76 0 : .expect("Failed to register safekeeper_wal_disk_io_errors counter")
77 0 : });
78 0 : pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
79 0 : register_int_counter!(
80 : "safekeeper_wal_storage_limit_errors",
81 : concat!(
82 : "Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
83 : "An increase in this metric indicates issues backing up or removing WALs."
84 : )
85 : )
86 0 : .expect("Failed to register safekeeper_wal_storage_limit_errors counter")
87 0 : });
88 : /* END_HADRON */
89 7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
90 7 : register_histogram!(
91 : "safekeeper_persist_control_file_seconds",
92 : "Seconds to persist and sync control file",
93 7 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
94 : )
95 7 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
96 7 : });
97 5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
98 5 : register_histogram_vec!(
99 : "safekeeper_wal_storage_operation_seconds",
100 : "Seconds spent on WAL storage operations",
101 5 : &["operation"],
102 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
103 : )
104 5 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
105 5 : });
106 15 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
107 15 : register_histogram_vec!(
108 : "safekeeper_misc_operation_seconds",
109 : "Seconds spent on miscellaneous operations",
110 15 : &["operation"],
111 15 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
112 : )
113 15 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
114 15 : });
115 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
116 0 : register_int_counter_vec!(
117 : "safekeeper_pg_io_bytes_total",
118 : "Bytes read from or written to any PostgreSQL connection",
119 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
120 : )
121 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
122 0 : });
123 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
124 0 : register_int_counter!(
125 : "safekeeper_broker_pushed_updates_total",
126 : "Number of timeline updates pushed to the broker"
127 : )
128 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
129 0 : });
130 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
131 0 : register_int_counter_vec!(
132 : "safekeeper_broker_pulled_updates_total",
133 : "Number of timeline updates pulled and processed from the broker",
134 0 : &["result"]
135 : )
136 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
137 0 : });
138 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
139 0 : register_int_counter_pair_vec!(
140 : "safekeeper_pg_queries_received_total",
141 : "Number of queries received through pg protocol",
142 : "safekeeper_pg_queries_finished_total",
143 : "Number of queries finished through pg protocol",
144 0 : &["query"]
145 : )
146 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
147 0 : });
148 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
149 0 : register_int_counter!(
150 : "safekeeper_removed_wal_segments_total",
151 : "Number of WAL segments removed from the disk"
152 : )
153 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
154 0 : });
155 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
156 0 : register_int_counter!(
157 : "safekeeper_backed_up_segments_total",
158 : "Number of WAL segments backed up to the S3"
159 : )
160 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
161 0 : });
162 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
163 0 : register_int_counter!(
164 : "safekeeper_backup_errors_total",
165 : "Number of errors during backup"
166 : )
167 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
168 0 : });
169 : /* BEGIN_HADRON */
170 0 : pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
171 0 : register_int_counter!(
172 : "safekeeper_backup_reelect_leader_total",
173 : "Number of times the backup leader was reelected"
174 : )
175 0 : .expect("Failed to register safekeeper_backup_reelect_leader_total counter")
176 0 : });
177 : /* END_HADRON */
178 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
179 0 : register_histogram!(
180 : "safekeeper_broker_push_update_seconds",
181 : "Seconds to push all timeline updates to the broker",
182 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
183 : )
184 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
185 0 : });
186 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
187 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
188 : ];
189 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
190 0 : register_histogram!(
191 : "safekeeper_broker_iteration_timelines",
192 : "Count of timelines pushed to the broker in a single iteration",
193 0 : TIMELINES_COUNT_BUCKETS.to_vec()
194 : )
195 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
196 0 : });
197 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
198 0 : register_int_counter!(
199 : "safekeeper_received_ps_feedbacks_total",
200 : "Number of pageserver feedbacks received"
201 : )
202 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
203 0 : });
204 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
205 0 : register_int_counter_vec!(
206 : "safekeeper_partial_backup_uploads_total",
207 : "Number of partial backup uploads to the S3",
208 0 : &["result"]
209 : )
210 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
211 0 : });
212 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
213 0 : register_int_counter!(
214 : "safekeeper_partial_backup_uploaded_bytes_total",
215 : "Number of bytes uploaded to the S3 during partial backup"
216 : )
217 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
218 0 : });
219 5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
220 5 : register_int_counter!(
221 : "safekeeper_manager_iterations_total",
222 : "Number of iterations of the timeline manager task"
223 : )
224 5 : .expect("Failed to register safekeeper_manager_iterations_total counter")
225 5 : });
226 5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
227 5 : register_int_counter!(
228 : "safekeeper_manager_active_changes_total",
229 : "Number of timeline active status changes in the timeline manager task"
230 : )
231 5 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
232 5 : });
233 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
234 0 : register_int_counter_pair!(
235 : "safekeeper_wal_backup_tasks_started_total",
236 : "Number of active WAL backup tasks",
237 : "safekeeper_wal_backup_tasks_finished_total",
238 : "Number of finished WAL backup tasks",
239 : )
240 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
241 0 : });
242 5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
243 5 : register_int_gauge!(
244 : "safekeeper_wal_receivers",
245 : "Number of currently connected WAL receivers (i.e. connected computes)"
246 : )
247 5 : .expect("Failed to register safekeeper_wal_receivers")
248 5 : });
249 4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
250 4 : register_int_gauge_vec!(
251 : "safekeeper_wal_readers",
252 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
253 4 : &["kind", "target"]
254 : )
255 4 : .expect("Failed to register safekeeper_wal_receivers")
256 4 : });
257 5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
258 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
259 : // full queues respectively.
260 5 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
261 5 : buckets.insert(0, 0.0);
262 5 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
263 5 : assert!(buckets.len() <= 12, "too many histogram buckets");
264 :
265 5 : register_histogram!(
266 : "safekeeper_wal_receiver_queue_depth",
267 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
268 5 : buckets
269 : )
270 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
271 5 : });
272 5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
273 5 : register_int_gauge!(
274 : "safekeeper_wal_receiver_queue_depth_total",
275 : "Total number of queued messages across all WAL receivers",
276 : )
277 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
278 5 : });
279 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
280 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
281 5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
282 5 : register_int_gauge!(
283 : "safekeeper_wal_receiver_queue_size_total",
284 : "Total memory byte size of queued messages across all WAL receivers",
285 : )
286 5 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
287 5 : });
288 :
289 : // Metrics collected on operations on the storage repository.
290 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
291 : #[strum(serialize_all = "kebab_case")]
292 : pub(crate) enum EvictionEvent {
293 : Evict,
294 : Restore,
295 : }
296 :
297 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
298 0 : register_int_counter_vec!(
299 : "safekeeper_eviction_events_started_total",
300 : "Number of eviction state changes, incremented when they start",
301 0 : &["kind"]
302 : )
303 0 : .expect("Failed to register metric")
304 0 : });
305 :
306 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
307 0 : register_int_counter_vec!(
308 : "safekeeper_eviction_events_completed_total",
309 : "Number of eviction state changes, incremented when they complete",
310 0 : &["kind"]
311 : )
312 0 : .expect("Failed to register metric")
313 0 : });
314 :
315 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
316 0 : register_int_gauge!(
317 : "safekeeper_evicted_timelines",
318 : "Number of currently evicted timelines"
319 : )
320 0 : .expect("Failed to register metric")
321 0 : });
322 :
323 : pub const LABEL_UNKNOWN: &str = "unknown";
324 :
325 : /// Labels for traffic metrics.
326 : #[derive(Clone)]
327 : struct ConnectionLabels {
328 : /// Availability zone of the connection origin.
329 : client_az: String,
330 : /// Availability zone of the current safekeeper.
331 : sk_az: String,
332 : /// Client application name.
333 : app_name: String,
334 : }
335 :
336 : impl ConnectionLabels {
337 0 : fn new() -> Self {
338 0 : Self {
339 0 : client_az: LABEL_UNKNOWN.to_string(),
340 0 : sk_az: LABEL_UNKNOWN.to_string(),
341 0 : app_name: LABEL_UNKNOWN.to_string(),
342 0 : }
343 0 : }
344 :
345 0 : fn build_metrics(
346 0 : &self,
347 0 : ) -> (
348 0 : GenericCounter<metrics::core::AtomicU64>,
349 0 : GenericCounter<metrics::core::AtomicU64>,
350 0 : ) {
351 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
352 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
353 0 : (client_az, sk_az) => {
354 0 : if client_az == sk_az {
355 0 : "true"
356 : } else {
357 0 : "false"
358 : }
359 : }
360 : };
361 :
362 0 : let read = PG_IO_BYTES.with_label_values(&[
363 0 : &self.client_az,
364 0 : &self.sk_az,
365 0 : &self.app_name,
366 0 : "read",
367 0 : same_az,
368 0 : ]);
369 0 : let write = PG_IO_BYTES.with_label_values(&[
370 0 : &self.client_az,
371 0 : &self.sk_az,
372 0 : &self.app_name,
373 0 : "write",
374 0 : same_az,
375 0 : ]);
376 0 : (read, write)
377 0 : }
378 : }
379 :
380 : struct TrafficMetricsState {
381 : /// Labels for traffic metrics.
382 : labels: ConnectionLabels,
383 : /// Total bytes read from this connection.
384 : read: GenericCounter<metrics::core::AtomicU64>,
385 : /// Total bytes written to this connection.
386 : write: GenericCounter<metrics::core::AtomicU64>,
387 : }
388 :
389 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
390 : #[derive(Clone)]
391 : pub struct TrafficMetrics {
392 : state: Arc<RwLock<TrafficMetricsState>>,
393 : }
394 :
395 : impl Default for TrafficMetrics {
396 0 : fn default() -> Self {
397 0 : Self::new()
398 0 : }
399 : }
400 :
401 : impl TrafficMetrics {
402 0 : pub fn new() -> Self {
403 0 : let labels = ConnectionLabels::new();
404 0 : let (read, write) = labels.build_metrics();
405 0 : let state = TrafficMetricsState {
406 0 : labels,
407 0 : read,
408 0 : write,
409 0 : };
410 0 : Self {
411 0 : state: Arc::new(RwLock::new(state)),
412 0 : }
413 0 : }
414 :
415 0 : pub fn set_client_az(&self, value: &str) {
416 0 : let mut state = self.state.write().unwrap();
417 0 : state.labels.client_az = value.to_string();
418 0 : (state.read, state.write) = state.labels.build_metrics();
419 0 : }
420 :
421 0 : pub fn set_sk_az(&self, value: &str) {
422 0 : let mut state = self.state.write().unwrap();
423 0 : state.labels.sk_az = value.to_string();
424 0 : (state.read, state.write) = state.labels.build_metrics();
425 0 : }
426 :
427 0 : pub fn set_app_name(&self, value: &str) {
428 0 : let mut state = self.state.write().unwrap();
429 0 : state.labels.app_name = value.to_string();
430 0 : (state.read, state.write) = state.labels.build_metrics();
431 0 : }
432 :
433 0 : pub fn observe_read(&self, cnt: usize) {
434 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
435 0 : }
436 :
437 0 : pub fn observe_write(&self, cnt: usize) {
438 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
439 0 : }
440 : }
441 :
442 : /// Metrics for WalStorage in a single timeline.
443 : #[derive(Clone, Default)]
444 : pub struct WalStorageMetrics {
445 : /// How much bytes were written in total.
446 : write_wal_bytes: u64,
447 : /// How much time spent writing WAL to disk, waiting for write(2).
448 : write_wal_seconds: f64,
449 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
450 : flush_wal_seconds: f64,
451 : }
452 :
453 : impl WalStorageMetrics {
454 620 : pub fn observe_write_bytes(&mut self, bytes: usize) {
455 620 : self.write_wal_bytes += bytes as u64;
456 620 : WRITE_WAL_BYTES.observe(bytes as f64);
457 620 : }
458 :
459 620 : pub fn observe_write_seconds(&mut self, seconds: f64) {
460 620 : self.write_wal_seconds += seconds;
461 620 : WRITE_WAL_SECONDS.observe(seconds);
462 620 : }
463 :
464 625 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
465 625 : self.flush_wal_seconds += seconds;
466 625 : FLUSH_WAL_SECONDS.observe(seconds);
467 625 : }
468 : }
469 :
470 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
471 1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
472 1245 : closure: impl Future<Output = Result<(), E>>,
473 1245 : ) -> Result<f64> {
474 1245 : let start = std::time::Instant::now();
475 1245 : closure.await.map_err(|e| e.into())?;
476 1245 : Ok(start.elapsed().as_secs_f64())
477 1245 : }
478 :
479 : /// Metrics for a single timeline.
480 : #[derive(Clone)]
481 : pub struct FullTimelineInfo {
482 : pub ttid: TenantTimelineId,
483 : pub ps_feedback_count: u64,
484 : pub last_ps_feedback: PageserverFeedback,
485 : pub wal_backup_active: bool,
486 : pub timeline_is_active: bool,
487 : pub num_computes: u32,
488 : pub last_removed_segno: XLogSegNo,
489 : pub interpreted_wal_reader_tasks: usize,
490 :
491 : pub epoch_start_lsn: Lsn,
492 : pub mem_state: TimelineMemState,
493 : pub persisted_state: TimelinePersistentState,
494 :
495 : pub flush_lsn: Lsn,
496 :
497 : pub wal_storage: WalStorageMetrics,
498 : }
499 :
500 : /// Collects metrics for all active timelines.
501 : pub struct TimelineCollector {
502 : global_timelines: Arc<GlobalTimelines>,
503 : descs: Vec<Desc>,
504 : commit_lsn: GenericGaugeVec<AtomicU64>,
505 : backup_lsn: GenericGaugeVec<AtomicU64>,
506 : flush_lsn: GenericGaugeVec<AtomicU64>,
507 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
508 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
509 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
510 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
511 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
512 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
513 : timeline_active: GenericGaugeVec<AtomicU64>,
514 : wal_backup_active: GenericGaugeVec<AtomicU64>,
515 : connected_computes: IntGaugeVec,
516 : disk_usage: GenericGaugeVec<AtomicU64>,
517 : acceptor_term: GenericGaugeVec<AtomicU64>,
518 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
519 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
520 : written_wal_seconds: GaugeVec,
521 : flushed_wal_seconds: GaugeVec,
522 : collect_timeline_metrics: Gauge,
523 : timelines_count: IntGauge,
524 : active_timelines_count: IntGauge,
525 : }
526 :
527 : impl TimelineCollector {
528 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
529 0 : let mut descs = Vec::new();
530 :
531 0 : let commit_lsn = GenericGaugeVec::new(
532 0 : Opts::new(
533 : "safekeeper_commit_lsn",
534 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
535 : ),
536 0 : &["tenant_id", "timeline_id"],
537 : )
538 0 : .unwrap();
539 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
540 :
541 0 : let backup_lsn = GenericGaugeVec::new(
542 0 : Opts::new(
543 : "safekeeper_backup_lsn",
544 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
545 : ),
546 0 : &["tenant_id", "timeline_id"],
547 : )
548 0 : .unwrap();
549 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
550 :
551 0 : let flush_lsn = GenericGaugeVec::new(
552 0 : Opts::new(
553 : "safekeeper_flush_lsn",
554 : "Current flush_lsn, grouped by timeline",
555 : ),
556 0 : &["tenant_id", "timeline_id"],
557 : )
558 0 : .unwrap();
559 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
560 :
561 0 : let epoch_start_lsn = GenericGaugeVec::new(
562 0 : Opts::new(
563 : "safekeeper_epoch_start_lsn",
564 : "Point since which compute generates new WAL in the current consensus term",
565 : ),
566 0 : &["tenant_id", "timeline_id"],
567 : )
568 0 : .unwrap();
569 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
570 :
571 0 : let peer_horizon_lsn = GenericGaugeVec::new(
572 0 : Opts::new(
573 : "safekeeper_peer_horizon_lsn",
574 : "LSN of the most lagging safekeeper",
575 : ),
576 0 : &["tenant_id", "timeline_id"],
577 : )
578 0 : .unwrap();
579 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
580 :
581 0 : let remote_consistent_lsn = GenericGaugeVec::new(
582 0 : Opts::new(
583 : "safekeeper_remote_consistent_lsn",
584 : "LSN which is persisted to the remote storage in pageserver",
585 : ),
586 0 : &["tenant_id", "timeline_id"],
587 : )
588 0 : .unwrap();
589 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
590 :
591 0 : let ps_last_received_lsn = GenericGaugeVec::new(
592 0 : Opts::new(
593 : "safekeeper_ps_last_received_lsn",
594 : "Last LSN received by the pageserver, acknowledged in the feedback",
595 : ),
596 0 : &["tenant_id", "timeline_id"],
597 : )
598 0 : .unwrap();
599 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
600 :
601 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
602 0 : Opts::new(
603 : "safekeeper_feedback_last_time_seconds",
604 : "Timestamp of the last feedback from the pageserver",
605 : ),
606 0 : &["tenant_id", "timeline_id"],
607 : )
608 0 : .unwrap();
609 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
610 :
611 0 : let ps_feedback_count = GenericGaugeVec::new(
612 0 : Opts::new(
613 : "safekeeper_ps_feedback_count_total",
614 : "Number of feedbacks received from the pageserver",
615 : ),
616 0 : &["tenant_id", "timeline_id"],
617 : )
618 0 : .unwrap();
619 :
620 0 : let timeline_active = GenericGaugeVec::new(
621 0 : Opts::new(
622 : "safekeeper_timeline_active",
623 : "Reports 1 for active timelines, 0 for inactive",
624 : ),
625 0 : &["tenant_id", "timeline_id"],
626 : )
627 0 : .unwrap();
628 0 : descs.extend(timeline_active.desc().into_iter().cloned());
629 :
630 0 : let wal_backup_active = GenericGaugeVec::new(
631 0 : Opts::new(
632 : "safekeeper_wal_backup_active",
633 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
634 : ),
635 0 : &["tenant_id", "timeline_id"],
636 : )
637 0 : .unwrap();
638 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
639 :
640 0 : let connected_computes = IntGaugeVec::new(
641 0 : Opts::new(
642 : "safekeeper_connected_computes",
643 : "Number of active compute connections",
644 : ),
645 0 : &["tenant_id", "timeline_id"],
646 : )
647 0 : .unwrap();
648 0 : descs.extend(connected_computes.desc().into_iter().cloned());
649 :
650 0 : let disk_usage = GenericGaugeVec::new(
651 0 : Opts::new(
652 : "safekeeper_disk_usage_bytes",
653 : "Estimated disk space used to store WAL segments",
654 : ),
655 0 : &["tenant_id", "timeline_id"],
656 : )
657 0 : .unwrap();
658 0 : descs.extend(disk_usage.desc().into_iter().cloned());
659 :
660 0 : let acceptor_term = GenericGaugeVec::new(
661 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
662 0 : &["tenant_id", "timeline_id"],
663 : )
664 0 : .unwrap();
665 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
666 :
667 0 : let written_wal_bytes = GenericGaugeVec::new(
668 0 : Opts::new(
669 : "safekeeper_written_wal_bytes_total",
670 : "Number of WAL bytes written to disk, grouped by timeline",
671 : ),
672 0 : &["tenant_id", "timeline_id"],
673 : )
674 0 : .unwrap();
675 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
676 :
677 0 : let written_wal_seconds = GaugeVec::new(
678 0 : Opts::new(
679 : "safekeeper_written_wal_seconds_total",
680 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
681 : ),
682 0 : &["tenant_id", "timeline_id"],
683 : )
684 0 : .unwrap();
685 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
686 :
687 0 : let flushed_wal_seconds = GaugeVec::new(
688 0 : Opts::new(
689 : "safekeeper_flushed_wal_seconds_total",
690 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
691 : ),
692 0 : &["tenant_id", "timeline_id"],
693 : )
694 0 : .unwrap();
695 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
696 :
697 0 : let collect_timeline_metrics = Gauge::new(
698 : "safekeeper_collect_timeline_metrics_seconds",
699 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
700 : )
701 0 : .unwrap();
702 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
703 :
704 0 : let timelines_count = IntGauge::new(
705 : "safekeeper_timelines",
706 : "Total number of timelines loaded in-memory",
707 : )
708 0 : .unwrap();
709 0 : descs.extend(timelines_count.desc().into_iter().cloned());
710 :
711 0 : let active_timelines_count = IntGauge::new(
712 : "safekeeper_active_timelines",
713 : "Total number of active timelines",
714 : )
715 0 : .unwrap();
716 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
717 :
718 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
719 0 : Opts::new(
720 : "safekeeper_interpreted_wal_reader_tasks",
721 : "Number of active interpreted wal reader tasks, grouped by timeline",
722 : ),
723 0 : &["tenant_id", "timeline_id"],
724 : )
725 0 : .unwrap();
726 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
727 :
728 0 : TimelineCollector {
729 0 : global_timelines,
730 0 : descs,
731 0 : commit_lsn,
732 0 : backup_lsn,
733 0 : flush_lsn,
734 0 : epoch_start_lsn,
735 0 : peer_horizon_lsn,
736 0 : remote_consistent_lsn,
737 0 : ps_last_received_lsn,
738 0 : feedback_last_time_seconds,
739 0 : ps_feedback_count,
740 0 : timeline_active,
741 0 : wal_backup_active,
742 0 : connected_computes,
743 0 : disk_usage,
744 0 : acceptor_term,
745 0 : written_wal_bytes,
746 0 : written_wal_seconds,
747 0 : flushed_wal_seconds,
748 0 : collect_timeline_metrics,
749 0 : timelines_count,
750 0 : active_timelines_count,
751 0 : interpreted_wal_reader_tasks,
752 0 : }
753 0 : }
754 : }
755 :
756 : impl Collector for TimelineCollector {
757 0 : fn desc(&self) -> Vec<&Desc> {
758 0 : self.descs.iter().collect()
759 0 : }
760 :
761 0 : fn collect(&self) -> Vec<MetricFamily> {
762 0 : let start_collecting = Instant::now();
763 :
764 : // reset all metrics to clean up inactive timelines
765 0 : self.commit_lsn.reset();
766 0 : self.backup_lsn.reset();
767 0 : self.flush_lsn.reset();
768 0 : self.epoch_start_lsn.reset();
769 0 : self.peer_horizon_lsn.reset();
770 0 : self.remote_consistent_lsn.reset();
771 0 : self.ps_last_received_lsn.reset();
772 0 : self.feedback_last_time_seconds.reset();
773 0 : self.ps_feedback_count.reset();
774 0 : self.timeline_active.reset();
775 0 : self.wal_backup_active.reset();
776 0 : self.connected_computes.reset();
777 0 : self.disk_usage.reset();
778 0 : self.acceptor_term.reset();
779 0 : self.written_wal_bytes.reset();
780 0 : self.interpreted_wal_reader_tasks.reset();
781 0 : self.written_wal_seconds.reset();
782 0 : self.flushed_wal_seconds.reset();
783 :
784 0 : let timelines_count = self.global_timelines.get_all().len();
785 0 : let mut active_timelines_count = 0;
786 :
787 : // Prometheus Collector is sync, and data is stored under async lock. To
788 : // bridge the gap with a crutch, collect data in spawned thread with
789 : // local tokio runtime.
790 0 : let global_timelines = self.global_timelines.clone();
791 0 : let infos = std::thread::spawn(|| {
792 0 : let rt = tokio::runtime::Builder::new_current_thread()
793 0 : .build()
794 0 : .expect("failed to create rt");
795 0 : rt.block_on(collect_timeline_metrics(global_timelines))
796 0 : })
797 0 : .join()
798 0 : .expect("collect_timeline_metrics thread panicked");
799 :
800 0 : for tli in &infos {
801 0 : let tenant_id = tli.ttid.tenant_id.to_string();
802 0 : let timeline_id = tli.ttid.timeline_id.to_string();
803 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
804 :
805 0 : if tli.timeline_is_active {
806 0 : active_timelines_count += 1;
807 0 : }
808 :
809 0 : self.commit_lsn
810 0 : .with_label_values(labels)
811 0 : .set(tli.mem_state.commit_lsn.into());
812 0 : self.backup_lsn
813 0 : .with_label_values(labels)
814 0 : .set(tli.mem_state.backup_lsn.into());
815 0 : self.flush_lsn
816 0 : .with_label_values(labels)
817 0 : .set(tli.flush_lsn.into());
818 0 : self.epoch_start_lsn
819 0 : .with_label_values(labels)
820 0 : .set(tli.epoch_start_lsn.into());
821 0 : self.peer_horizon_lsn
822 0 : .with_label_values(labels)
823 0 : .set(tli.mem_state.peer_horizon_lsn.into());
824 0 : self.remote_consistent_lsn
825 0 : .with_label_values(labels)
826 0 : .set(tli.mem_state.remote_consistent_lsn.into());
827 0 : self.timeline_active
828 0 : .with_label_values(labels)
829 0 : .set(tli.timeline_is_active as u64);
830 0 : self.wal_backup_active
831 0 : .with_label_values(labels)
832 0 : .set(tli.wal_backup_active as u64);
833 0 : self.connected_computes
834 0 : .with_label_values(labels)
835 0 : .set(tli.num_computes as i64);
836 0 : self.acceptor_term
837 0 : .with_label_values(labels)
838 0 : .set(tli.persisted_state.acceptor_state.term);
839 0 : self.written_wal_bytes
840 0 : .with_label_values(labels)
841 0 : .set(tli.wal_storage.write_wal_bytes);
842 0 : self.interpreted_wal_reader_tasks
843 0 : .with_label_values(labels)
844 0 : .set(tli.interpreted_wal_reader_tasks as u64);
845 0 : self.written_wal_seconds
846 0 : .with_label_values(labels)
847 0 : .set(tli.wal_storage.write_wal_seconds);
848 0 : self.flushed_wal_seconds
849 0 : .with_label_values(labels)
850 0 : .set(tli.wal_storage.flush_wal_seconds);
851 :
852 0 : self.ps_last_received_lsn
853 0 : .with_label_values(labels)
854 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
855 0 : self.ps_feedback_count
856 0 : .with_label_values(labels)
857 0 : .set(tli.ps_feedback_count);
858 0 : if let Ok(unix_time) = tli
859 0 : .last_ps_feedback
860 0 : .replytime
861 0 : .duration_since(SystemTime::UNIX_EPOCH)
862 0 : {
863 0 : self.feedback_last_time_seconds
864 0 : .with_label_values(labels)
865 0 : .set(unix_time.as_secs());
866 0 : }
867 :
868 0 : if tli.last_removed_segno != 0 {
869 0 : let segno_count = tli
870 0 : .flush_lsn
871 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
872 0 : - tli.last_removed_segno;
873 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
874 0 : self.disk_usage
875 0 : .with_label_values(labels)
876 0 : .set(disk_usage_bytes);
877 0 : }
878 : }
879 :
880 : // collect MetricFamilys.
881 0 : let mut mfs = Vec::new();
882 0 : mfs.extend(self.commit_lsn.collect());
883 0 : mfs.extend(self.backup_lsn.collect());
884 0 : mfs.extend(self.flush_lsn.collect());
885 0 : mfs.extend(self.epoch_start_lsn.collect());
886 0 : mfs.extend(self.peer_horizon_lsn.collect());
887 0 : mfs.extend(self.remote_consistent_lsn.collect());
888 0 : mfs.extend(self.ps_last_received_lsn.collect());
889 0 : mfs.extend(self.feedback_last_time_seconds.collect());
890 0 : mfs.extend(self.ps_feedback_count.collect());
891 0 : mfs.extend(self.timeline_active.collect());
892 0 : mfs.extend(self.wal_backup_active.collect());
893 0 : mfs.extend(self.connected_computes.collect());
894 0 : mfs.extend(self.disk_usage.collect());
895 0 : mfs.extend(self.acceptor_term.collect());
896 0 : mfs.extend(self.written_wal_bytes.collect());
897 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
898 0 : mfs.extend(self.written_wal_seconds.collect());
899 0 : mfs.extend(self.flushed_wal_seconds.collect());
900 :
901 : // report time it took to collect all info
902 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
903 0 : self.collect_timeline_metrics.set(elapsed);
904 0 : mfs.extend(self.collect_timeline_metrics.collect());
905 :
906 : // report total number of timelines
907 0 : self.timelines_count.set(timelines_count as i64);
908 0 : mfs.extend(self.timelines_count.collect());
909 :
910 0 : self.active_timelines_count
911 0 : .set(active_timelines_count as i64);
912 0 : mfs.extend(self.active_timelines_count.collect());
913 :
914 0 : mfs
915 0 : }
916 : }
917 :
918 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
919 0 : let mut res = vec![];
920 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
921 :
922 0 : for tli in active_timelines {
923 0 : if let Some(info) = tli.info_for_metrics().await {
924 0 : res.push(info);
925 0 : }
926 : }
927 0 : res
928 0 : }
|