Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::sync::{Arc, RwLock};
4 : use std::time::{Instant, SystemTime};
5 :
6 : use anyhow::Result;
7 : use futures::Future;
8 : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
9 : use metrics::proto::MetricFamily;
10 : use metrics::{
11 : DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
12 : IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
13 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
14 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
15 : register_int_gauge_vec,
16 : };
17 : use once_cell::sync::Lazy;
18 : use postgres_ffi::XLogSegNo;
19 : use utils::id::TenantTimelineId;
20 : use utils::lsn::Lsn;
21 : use utils::pageserver_feedback::PageserverFeedback;
22 :
23 : use crate::GlobalTimelines;
24 : use crate::receive_wal::MSG_QUEUE_SIZE;
25 : use crate::state::{TimelineMemState, TimelinePersistentState};
26 :
27 : // Global metrics across all timelines.
28 5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
29 5 : register_histogram!(
30 5 : "safekeeper_write_wal_bytes",
31 5 : "Bytes written to WAL in a single request",
32 5 : vec![
33 5 : 1.0,
34 5 : 10.0,
35 5 : 100.0,
36 5 : 1024.0,
37 5 : 8192.0,
38 5 : 128.0 * 1024.0,
39 5 : 1024.0 * 1024.0,
40 5 : 10.0 * 1024.0 * 1024.0
41 5 : ]
42 5 : )
43 5 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
44 5 : });
45 5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
46 5 : register_histogram!(
47 5 : "safekeeper_write_wal_seconds",
48 5 : "Seconds spent writing and syncing WAL to a disk in a single request",
49 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
50 5 : )
51 5 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
52 5 : });
53 5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54 5 : register_histogram!(
55 5 : "safekeeper_flush_wal_seconds",
56 5 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
57 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
58 5 : )
59 5 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
60 5 : });
61 7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
62 7 : register_histogram!(
63 7 : "safekeeper_persist_control_file_seconds",
64 7 : "Seconds to persist and sync control file",
65 7 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
66 7 : )
67 7 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
68 7 : });
69 5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
70 5 : register_histogram_vec!(
71 5 : "safekeeper_wal_storage_operation_seconds",
72 5 : "Seconds spent on WAL storage operations",
73 5 : &["operation"],
74 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
75 5 : )
76 5 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
77 5 : });
78 15 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
79 15 : register_histogram_vec!(
80 15 : "safekeeper_misc_operation_seconds",
81 15 : "Seconds spent on miscellaneous operations",
82 15 : &["operation"],
83 15 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
84 15 : )
85 15 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
86 15 : });
87 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
88 0 : register_int_counter_vec!(
89 0 : "safekeeper_pg_io_bytes_total",
90 0 : "Bytes read from or written to any PostgreSQL connection",
91 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
92 0 : )
93 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
94 0 : });
95 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
96 0 : register_int_counter!(
97 0 : "safekeeper_broker_pushed_updates_total",
98 0 : "Number of timeline updates pushed to the broker"
99 0 : )
100 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
101 0 : });
102 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
103 0 : register_int_counter_vec!(
104 0 : "safekeeper_broker_pulled_updates_total",
105 0 : "Number of timeline updates pulled and processed from the broker",
106 0 : &["result"]
107 0 : )
108 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
109 0 : });
110 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
111 0 : register_int_counter_pair_vec!(
112 0 : "safekeeper_pg_queries_received_total",
113 0 : "Number of queries received through pg protocol",
114 0 : "safekeeper_pg_queries_finished_total",
115 0 : "Number of queries finished through pg protocol",
116 0 : &["query"]
117 0 : )
118 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
119 0 : });
120 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
121 0 : register_int_counter!(
122 0 : "safekeeper_removed_wal_segments_total",
123 0 : "Number of WAL segments removed from the disk"
124 0 : )
125 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
126 0 : });
127 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
128 0 : register_int_counter!(
129 0 : "safekeeper_backed_up_segments_total",
130 0 : "Number of WAL segments backed up to the S3"
131 0 : )
132 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
133 0 : });
134 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
135 0 : register_int_counter!(
136 0 : "safekeeper_backup_errors_total",
137 0 : "Number of errors during backup"
138 0 : )
139 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
140 0 : });
141 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
142 0 : register_histogram!(
143 0 : "safekeeper_broker_push_update_seconds",
144 0 : "Seconds to push all timeline updates to the broker",
145 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
146 0 : )
147 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
148 0 : });
149 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
150 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
151 : ];
152 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
153 0 : register_histogram!(
154 0 : "safekeeper_broker_iteration_timelines",
155 0 : "Count of timelines pushed to the broker in a single iteration",
156 0 : TIMELINES_COUNT_BUCKETS.to_vec()
157 0 : )
158 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
159 0 : });
160 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
161 0 : register_int_counter!(
162 0 : "safekeeper_received_ps_feedbacks_total",
163 0 : "Number of pageserver feedbacks received"
164 0 : )
165 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
166 0 : });
167 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
168 0 : register_int_counter_vec!(
169 0 : "safekeeper_partial_backup_uploads_total",
170 0 : "Number of partial backup uploads to the S3",
171 0 : &["result"]
172 0 : )
173 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
174 0 : });
175 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
176 0 : register_int_counter!(
177 0 : "safekeeper_partial_backup_uploaded_bytes_total",
178 0 : "Number of bytes uploaded to the S3 during partial backup"
179 0 : )
180 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
181 0 : });
182 5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
183 5 : register_int_counter!(
184 5 : "safekeeper_manager_iterations_total",
185 5 : "Number of iterations of the timeline manager task"
186 5 : )
187 5 : .expect("Failed to register safekeeper_manager_iterations_total counter")
188 5 : });
189 5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
190 5 : register_int_counter!(
191 5 : "safekeeper_manager_active_changes_total",
192 5 : "Number of timeline active status changes in the timeline manager task"
193 5 : )
194 5 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
195 5 : });
196 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
197 0 : register_int_counter_pair!(
198 0 : "safekeeper_wal_backup_tasks_started_total",
199 0 : "Number of active WAL backup tasks",
200 0 : "safekeeper_wal_backup_tasks_finished_total",
201 0 : "Number of finished WAL backup tasks",
202 0 : )
203 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
204 0 : });
205 5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
206 5 : register_int_gauge!(
207 5 : "safekeeper_wal_receivers",
208 5 : "Number of currently connected WAL receivers (i.e. connected computes)"
209 5 : )
210 5 : .expect("Failed to register safekeeper_wal_receivers")
211 5 : });
212 4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
213 4 : register_int_gauge_vec!(
214 4 : "safekeeper_wal_readers",
215 4 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
216 4 : &["kind", "target"]
217 4 : )
218 4 : .expect("Failed to register safekeeper_wal_receivers")
219 4 : });
220 5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
221 5 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
222 5 : // full queues respectively.
223 5 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
224 5 : buckets.insert(0, 0.0);
225 5 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
226 5 : assert!(buckets.len() <= 12, "too many histogram buckets");
227 :
228 5 : register_histogram!(
229 5 : "safekeeper_wal_receiver_queue_depth",
230 5 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
231 5 : buckets
232 5 : )
233 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
234 5 : });
235 5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
236 5 : register_int_gauge!(
237 5 : "safekeeper_wal_receiver_queue_depth_total",
238 5 : "Total number of queued messages across all WAL receivers",
239 5 : )
240 5 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
241 5 : });
242 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
243 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
244 5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
245 5 : register_int_gauge!(
246 5 : "safekeeper_wal_receiver_queue_size_total",
247 5 : "Total memory byte size of queued messages across all WAL receivers",
248 5 : )
249 5 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
250 5 : });
251 :
252 : // Metrics collected on operations on the storage repository.
253 0 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
254 : #[strum(serialize_all = "kebab_case")]
255 : pub(crate) enum EvictionEvent {
256 : Evict,
257 : Restore,
258 : }
259 :
260 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
261 0 : register_int_counter_vec!(
262 0 : "safekeeper_eviction_events_started_total",
263 0 : "Number of eviction state changes, incremented when they start",
264 0 : &["kind"]
265 0 : )
266 0 : .expect("Failed to register metric")
267 0 : });
268 :
269 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
270 0 : register_int_counter_vec!(
271 0 : "safekeeper_eviction_events_completed_total",
272 0 : "Number of eviction state changes, incremented when they complete",
273 0 : &["kind"]
274 0 : )
275 0 : .expect("Failed to register metric")
276 0 : });
277 :
278 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
279 0 : register_int_gauge!(
280 0 : "safekeeper_evicted_timelines",
281 0 : "Number of currently evicted timelines"
282 0 : )
283 0 : .expect("Failed to register metric")
284 0 : });
285 :
286 : pub const LABEL_UNKNOWN: &str = "unknown";
287 :
288 : /// Labels for traffic metrics.
289 : #[derive(Clone)]
290 : struct ConnectionLabels {
291 : /// Availability zone of the connection origin.
292 : client_az: String,
293 : /// Availability zone of the current safekeeper.
294 : sk_az: String,
295 : /// Client application name.
296 : app_name: String,
297 : }
298 :
299 : impl ConnectionLabels {
300 0 : fn new() -> Self {
301 0 : Self {
302 0 : client_az: LABEL_UNKNOWN.to_string(),
303 0 : sk_az: LABEL_UNKNOWN.to_string(),
304 0 : app_name: LABEL_UNKNOWN.to_string(),
305 0 : }
306 0 : }
307 :
308 0 : fn build_metrics(
309 0 : &self,
310 0 : ) -> (
311 0 : GenericCounter<metrics::core::AtomicU64>,
312 0 : GenericCounter<metrics::core::AtomicU64>,
313 0 : ) {
314 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
315 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
316 0 : (client_az, sk_az) => {
317 0 : if client_az == sk_az {
318 0 : "true"
319 : } else {
320 0 : "false"
321 : }
322 : }
323 : };
324 :
325 0 : let read = PG_IO_BYTES.with_label_values(&[
326 0 : &self.client_az,
327 0 : &self.sk_az,
328 0 : &self.app_name,
329 0 : "read",
330 0 : same_az,
331 0 : ]);
332 0 : let write = PG_IO_BYTES.with_label_values(&[
333 0 : &self.client_az,
334 0 : &self.sk_az,
335 0 : &self.app_name,
336 0 : "write",
337 0 : same_az,
338 0 : ]);
339 0 : (read, write)
340 0 : }
341 : }
342 :
343 : struct TrafficMetricsState {
344 : /// Labels for traffic metrics.
345 : labels: ConnectionLabels,
346 : /// Total bytes read from this connection.
347 : read: GenericCounter<metrics::core::AtomicU64>,
348 : /// Total bytes written to this connection.
349 : write: GenericCounter<metrics::core::AtomicU64>,
350 : }
351 :
352 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
353 : #[derive(Clone)]
354 : pub struct TrafficMetrics {
355 : state: Arc<RwLock<TrafficMetricsState>>,
356 : }
357 :
358 : impl Default for TrafficMetrics {
359 0 : fn default() -> Self {
360 0 : Self::new()
361 0 : }
362 : }
363 :
364 : impl TrafficMetrics {
365 0 : pub fn new() -> Self {
366 0 : let labels = ConnectionLabels::new();
367 0 : let (read, write) = labels.build_metrics();
368 0 : let state = TrafficMetricsState {
369 0 : labels,
370 0 : read,
371 0 : write,
372 0 : };
373 0 : Self {
374 0 : state: Arc::new(RwLock::new(state)),
375 0 : }
376 0 : }
377 :
378 0 : pub fn set_client_az(&self, value: &str) {
379 0 : let mut state = self.state.write().unwrap();
380 0 : state.labels.client_az = value.to_string();
381 0 : (state.read, state.write) = state.labels.build_metrics();
382 0 : }
383 :
384 0 : pub fn set_sk_az(&self, value: &str) {
385 0 : let mut state = self.state.write().unwrap();
386 0 : state.labels.sk_az = value.to_string();
387 0 : (state.read, state.write) = state.labels.build_metrics();
388 0 : }
389 :
390 0 : pub fn set_app_name(&self, value: &str) {
391 0 : let mut state = self.state.write().unwrap();
392 0 : state.labels.app_name = value.to_string();
393 0 : (state.read, state.write) = state.labels.build_metrics();
394 0 : }
395 :
396 0 : pub fn observe_read(&self, cnt: usize) {
397 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
398 0 : }
399 :
400 0 : pub fn observe_write(&self, cnt: usize) {
401 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
402 0 : }
403 : }
404 :
405 : /// Metrics for WalStorage in a single timeline.
406 : #[derive(Clone, Default)]
407 : pub struct WalStorageMetrics {
408 : /// How much bytes were written in total.
409 : write_wal_bytes: u64,
410 : /// How much time spent writing WAL to disk, waiting for write(2).
411 : write_wal_seconds: f64,
412 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
413 : flush_wal_seconds: f64,
414 : }
415 :
416 : impl WalStorageMetrics {
417 620 : pub fn observe_write_bytes(&mut self, bytes: usize) {
418 620 : self.write_wal_bytes += bytes as u64;
419 620 : WRITE_WAL_BYTES.observe(bytes as f64);
420 620 : }
421 :
422 620 : pub fn observe_write_seconds(&mut self, seconds: f64) {
423 620 : self.write_wal_seconds += seconds;
424 620 : WRITE_WAL_SECONDS.observe(seconds);
425 620 : }
426 :
427 625 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
428 625 : self.flush_wal_seconds += seconds;
429 625 : FLUSH_WAL_SECONDS.observe(seconds);
430 625 : }
431 : }
432 :
433 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
434 1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
435 1245 : closure: impl Future<Output = Result<(), E>>,
436 1245 : ) -> Result<f64> {
437 1245 : let start = std::time::Instant::now();
438 1245 : closure.await.map_err(|e| e.into())?;
439 1245 : Ok(start.elapsed().as_secs_f64())
440 1245 : }
441 :
442 : /// Metrics for a single timeline.
443 : #[derive(Clone)]
444 : pub struct FullTimelineInfo {
445 : pub ttid: TenantTimelineId,
446 : pub ps_feedback_count: u64,
447 : pub last_ps_feedback: PageserverFeedback,
448 : pub wal_backup_active: bool,
449 : pub timeline_is_active: bool,
450 : pub num_computes: u32,
451 : pub last_removed_segno: XLogSegNo,
452 : pub interpreted_wal_reader_tasks: usize,
453 :
454 : pub epoch_start_lsn: Lsn,
455 : pub mem_state: TimelineMemState,
456 : pub persisted_state: TimelinePersistentState,
457 :
458 : pub flush_lsn: Lsn,
459 :
460 : pub wal_storage: WalStorageMetrics,
461 : }
462 :
463 : /// Collects metrics for all active timelines.
464 : pub struct TimelineCollector {
465 : global_timelines: Arc<GlobalTimelines>,
466 : descs: Vec<Desc>,
467 : commit_lsn: GenericGaugeVec<AtomicU64>,
468 : backup_lsn: GenericGaugeVec<AtomicU64>,
469 : flush_lsn: GenericGaugeVec<AtomicU64>,
470 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
471 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
472 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
473 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
474 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
475 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
476 : timeline_active: GenericGaugeVec<AtomicU64>,
477 : wal_backup_active: GenericGaugeVec<AtomicU64>,
478 : connected_computes: IntGaugeVec,
479 : disk_usage: GenericGaugeVec<AtomicU64>,
480 : acceptor_term: GenericGaugeVec<AtomicU64>,
481 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
482 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
483 : written_wal_seconds: GaugeVec,
484 : flushed_wal_seconds: GaugeVec,
485 : collect_timeline_metrics: Gauge,
486 : timelines_count: IntGauge,
487 : active_timelines_count: IntGauge,
488 : }
489 :
490 : impl TimelineCollector {
491 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
492 0 : let mut descs = Vec::new();
493 0 :
494 0 : let commit_lsn = GenericGaugeVec::new(
495 0 : Opts::new(
496 0 : "safekeeper_commit_lsn",
497 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
498 0 : ),
499 0 : &["tenant_id", "timeline_id"],
500 0 : )
501 0 : .unwrap();
502 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
503 0 :
504 0 : let backup_lsn = GenericGaugeVec::new(
505 0 : Opts::new(
506 0 : "safekeeper_backup_lsn",
507 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
508 0 : ),
509 0 : &["tenant_id", "timeline_id"],
510 0 : )
511 0 : .unwrap();
512 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
513 0 :
514 0 : let flush_lsn = GenericGaugeVec::new(
515 0 : Opts::new(
516 0 : "safekeeper_flush_lsn",
517 0 : "Current flush_lsn, grouped by timeline",
518 0 : ),
519 0 : &["tenant_id", "timeline_id"],
520 0 : )
521 0 : .unwrap();
522 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
523 0 :
524 0 : let epoch_start_lsn = GenericGaugeVec::new(
525 0 : Opts::new(
526 0 : "safekeeper_epoch_start_lsn",
527 0 : "Point since which compute generates new WAL in the current consensus term",
528 0 : ),
529 0 : &["tenant_id", "timeline_id"],
530 0 : )
531 0 : .unwrap();
532 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
533 0 :
534 0 : let peer_horizon_lsn = GenericGaugeVec::new(
535 0 : Opts::new(
536 0 : "safekeeper_peer_horizon_lsn",
537 0 : "LSN of the most lagging safekeeper",
538 0 : ),
539 0 : &["tenant_id", "timeline_id"],
540 0 : )
541 0 : .unwrap();
542 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
543 0 :
544 0 : let remote_consistent_lsn = GenericGaugeVec::new(
545 0 : Opts::new(
546 0 : "safekeeper_remote_consistent_lsn",
547 0 : "LSN which is persisted to the remote storage in pageserver",
548 0 : ),
549 0 : &["tenant_id", "timeline_id"],
550 0 : )
551 0 : .unwrap();
552 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
553 0 :
554 0 : let ps_last_received_lsn = GenericGaugeVec::new(
555 0 : Opts::new(
556 0 : "safekeeper_ps_last_received_lsn",
557 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
558 0 : ),
559 0 : &["tenant_id", "timeline_id"],
560 0 : )
561 0 : .unwrap();
562 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
563 0 :
564 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
565 0 : Opts::new(
566 0 : "safekeeper_feedback_last_time_seconds",
567 0 : "Timestamp of the last feedback from the pageserver",
568 0 : ),
569 0 : &["tenant_id", "timeline_id"],
570 0 : )
571 0 : .unwrap();
572 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
573 0 :
574 0 : let ps_feedback_count = GenericGaugeVec::new(
575 0 : Opts::new(
576 0 : "safekeeper_ps_feedback_count_total",
577 0 : "Number of feedbacks received from the pageserver",
578 0 : ),
579 0 : &["tenant_id", "timeline_id"],
580 0 : )
581 0 : .unwrap();
582 0 :
583 0 : let timeline_active = GenericGaugeVec::new(
584 0 : Opts::new(
585 0 : "safekeeper_timeline_active",
586 0 : "Reports 1 for active timelines, 0 for inactive",
587 0 : ),
588 0 : &["tenant_id", "timeline_id"],
589 0 : )
590 0 : .unwrap();
591 0 : descs.extend(timeline_active.desc().into_iter().cloned());
592 0 :
593 0 : let wal_backup_active = GenericGaugeVec::new(
594 0 : Opts::new(
595 0 : "safekeeper_wal_backup_active",
596 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
597 0 : ),
598 0 : &["tenant_id", "timeline_id"],
599 0 : )
600 0 : .unwrap();
601 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
602 0 :
603 0 : let connected_computes = IntGaugeVec::new(
604 0 : Opts::new(
605 0 : "safekeeper_connected_computes",
606 0 : "Number of active compute connections",
607 0 : ),
608 0 : &["tenant_id", "timeline_id"],
609 0 : )
610 0 : .unwrap();
611 0 : descs.extend(connected_computes.desc().into_iter().cloned());
612 0 :
613 0 : let disk_usage = GenericGaugeVec::new(
614 0 : Opts::new(
615 0 : "safekeeper_disk_usage_bytes",
616 0 : "Estimated disk space used to store WAL segments",
617 0 : ),
618 0 : &["tenant_id", "timeline_id"],
619 0 : )
620 0 : .unwrap();
621 0 : descs.extend(disk_usage.desc().into_iter().cloned());
622 0 :
623 0 : let acceptor_term = GenericGaugeVec::new(
624 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
625 0 : &["tenant_id", "timeline_id"],
626 0 : )
627 0 : .unwrap();
628 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
629 0 :
630 0 : let written_wal_bytes = GenericGaugeVec::new(
631 0 : Opts::new(
632 0 : "safekeeper_written_wal_bytes_total",
633 0 : "Number of WAL bytes written to disk, grouped by timeline",
634 0 : ),
635 0 : &["tenant_id", "timeline_id"],
636 0 : )
637 0 : .unwrap();
638 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
639 0 :
640 0 : let written_wal_seconds = GaugeVec::new(
641 0 : Opts::new(
642 0 : "safekeeper_written_wal_seconds_total",
643 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
644 0 : ),
645 0 : &["tenant_id", "timeline_id"],
646 0 : )
647 0 : .unwrap();
648 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
649 0 :
650 0 : let flushed_wal_seconds = GaugeVec::new(
651 0 : Opts::new(
652 0 : "safekeeper_flushed_wal_seconds_total",
653 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
654 0 : ),
655 0 : &["tenant_id", "timeline_id"],
656 0 : )
657 0 : .unwrap();
658 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
659 0 :
660 0 : let collect_timeline_metrics = Gauge::new(
661 0 : "safekeeper_collect_timeline_metrics_seconds",
662 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
663 0 : )
664 0 : .unwrap();
665 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
666 0 :
667 0 : let timelines_count = IntGauge::new(
668 0 : "safekeeper_timelines",
669 0 : "Total number of timelines loaded in-memory",
670 0 : )
671 0 : .unwrap();
672 0 : descs.extend(timelines_count.desc().into_iter().cloned());
673 0 :
674 0 : let active_timelines_count = IntGauge::new(
675 0 : "safekeeper_active_timelines",
676 0 : "Total number of active timelines",
677 0 : )
678 0 : .unwrap();
679 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
680 0 :
681 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
682 0 : Opts::new(
683 0 : "safekeeper_interpreted_wal_reader_tasks",
684 0 : "Number of active interpreted wal reader tasks, grouped by timeline",
685 0 : ),
686 0 : &["tenant_id", "timeline_id"],
687 0 : )
688 0 : .unwrap();
689 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
690 0 :
691 0 : TimelineCollector {
692 0 : global_timelines,
693 0 : descs,
694 0 : commit_lsn,
695 0 : backup_lsn,
696 0 : flush_lsn,
697 0 : epoch_start_lsn,
698 0 : peer_horizon_lsn,
699 0 : remote_consistent_lsn,
700 0 : ps_last_received_lsn,
701 0 : feedback_last_time_seconds,
702 0 : ps_feedback_count,
703 0 : timeline_active,
704 0 : wal_backup_active,
705 0 : connected_computes,
706 0 : disk_usage,
707 0 : acceptor_term,
708 0 : written_wal_bytes,
709 0 : written_wal_seconds,
710 0 : flushed_wal_seconds,
711 0 : collect_timeline_metrics,
712 0 : timelines_count,
713 0 : active_timelines_count,
714 0 : interpreted_wal_reader_tasks,
715 0 : }
716 0 : }
717 : }
718 :
719 : impl Collector for TimelineCollector {
720 0 : fn desc(&self) -> Vec<&Desc> {
721 0 : self.descs.iter().collect()
722 0 : }
723 :
724 0 : fn collect(&self) -> Vec<MetricFamily> {
725 0 : let start_collecting = Instant::now();
726 0 :
727 0 : // reset all metrics to clean up inactive timelines
728 0 : self.commit_lsn.reset();
729 0 : self.backup_lsn.reset();
730 0 : self.flush_lsn.reset();
731 0 : self.epoch_start_lsn.reset();
732 0 : self.peer_horizon_lsn.reset();
733 0 : self.remote_consistent_lsn.reset();
734 0 : self.ps_last_received_lsn.reset();
735 0 : self.feedback_last_time_seconds.reset();
736 0 : self.ps_feedback_count.reset();
737 0 : self.timeline_active.reset();
738 0 : self.wal_backup_active.reset();
739 0 : self.connected_computes.reset();
740 0 : self.disk_usage.reset();
741 0 : self.acceptor_term.reset();
742 0 : self.written_wal_bytes.reset();
743 0 : self.interpreted_wal_reader_tasks.reset();
744 0 : self.written_wal_seconds.reset();
745 0 : self.flushed_wal_seconds.reset();
746 0 :
747 0 : let timelines_count = self.global_timelines.get_all().len();
748 0 : let mut active_timelines_count = 0;
749 0 :
750 0 : // Prometheus Collector is sync, and data is stored under async lock. To
751 0 : // bridge the gap with a crutch, collect data in spawned thread with
752 0 : // local tokio runtime.
753 0 : let global_timelines = self.global_timelines.clone();
754 0 : let infos = std::thread::spawn(|| {
755 0 : let rt = tokio::runtime::Builder::new_current_thread()
756 0 : .build()
757 0 : .expect("failed to create rt");
758 0 : rt.block_on(collect_timeline_metrics(global_timelines))
759 0 : })
760 0 : .join()
761 0 : .expect("collect_timeline_metrics thread panicked");
762 :
763 0 : for tli in &infos {
764 0 : let tenant_id = tli.ttid.tenant_id.to_string();
765 0 : let timeline_id = tli.ttid.timeline_id.to_string();
766 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
767 0 :
768 0 : if tli.timeline_is_active {
769 0 : active_timelines_count += 1;
770 0 : }
771 :
772 0 : self.commit_lsn
773 0 : .with_label_values(labels)
774 0 : .set(tli.mem_state.commit_lsn.into());
775 0 : self.backup_lsn
776 0 : .with_label_values(labels)
777 0 : .set(tli.mem_state.backup_lsn.into());
778 0 : self.flush_lsn
779 0 : .with_label_values(labels)
780 0 : .set(tli.flush_lsn.into());
781 0 : self.epoch_start_lsn
782 0 : .with_label_values(labels)
783 0 : .set(tli.epoch_start_lsn.into());
784 0 : self.peer_horizon_lsn
785 0 : .with_label_values(labels)
786 0 : .set(tli.mem_state.peer_horizon_lsn.into());
787 0 : self.remote_consistent_lsn
788 0 : .with_label_values(labels)
789 0 : .set(tli.mem_state.remote_consistent_lsn.into());
790 0 : self.timeline_active
791 0 : .with_label_values(labels)
792 0 : .set(tli.timeline_is_active as u64);
793 0 : self.wal_backup_active
794 0 : .with_label_values(labels)
795 0 : .set(tli.wal_backup_active as u64);
796 0 : self.connected_computes
797 0 : .with_label_values(labels)
798 0 : .set(tli.num_computes as i64);
799 0 : self.acceptor_term
800 0 : .with_label_values(labels)
801 0 : .set(tli.persisted_state.acceptor_state.term);
802 0 : self.written_wal_bytes
803 0 : .with_label_values(labels)
804 0 : .set(tli.wal_storage.write_wal_bytes);
805 0 : self.interpreted_wal_reader_tasks
806 0 : .with_label_values(labels)
807 0 : .set(tli.interpreted_wal_reader_tasks as u64);
808 0 : self.written_wal_seconds
809 0 : .with_label_values(labels)
810 0 : .set(tli.wal_storage.write_wal_seconds);
811 0 : self.flushed_wal_seconds
812 0 : .with_label_values(labels)
813 0 : .set(tli.wal_storage.flush_wal_seconds);
814 0 :
815 0 : self.ps_last_received_lsn
816 0 : .with_label_values(labels)
817 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
818 0 : self.ps_feedback_count
819 0 : .with_label_values(labels)
820 0 : .set(tli.ps_feedback_count);
821 0 : if let Ok(unix_time) = tli
822 0 : .last_ps_feedback
823 0 : .replytime
824 0 : .duration_since(SystemTime::UNIX_EPOCH)
825 0 : {
826 0 : self.feedback_last_time_seconds
827 0 : .with_label_values(labels)
828 0 : .set(unix_time.as_secs());
829 0 : }
830 :
831 0 : if tli.last_removed_segno != 0 {
832 0 : let segno_count = tli
833 0 : .flush_lsn
834 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
835 0 : - tli.last_removed_segno;
836 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
837 0 : self.disk_usage
838 0 : .with_label_values(labels)
839 0 : .set(disk_usage_bytes);
840 0 : }
841 : }
842 :
843 : // collect MetricFamilys.
844 0 : let mut mfs = Vec::new();
845 0 : mfs.extend(self.commit_lsn.collect());
846 0 : mfs.extend(self.backup_lsn.collect());
847 0 : mfs.extend(self.flush_lsn.collect());
848 0 : mfs.extend(self.epoch_start_lsn.collect());
849 0 : mfs.extend(self.peer_horizon_lsn.collect());
850 0 : mfs.extend(self.remote_consistent_lsn.collect());
851 0 : mfs.extend(self.ps_last_received_lsn.collect());
852 0 : mfs.extend(self.feedback_last_time_seconds.collect());
853 0 : mfs.extend(self.ps_feedback_count.collect());
854 0 : mfs.extend(self.timeline_active.collect());
855 0 : mfs.extend(self.wal_backup_active.collect());
856 0 : mfs.extend(self.connected_computes.collect());
857 0 : mfs.extend(self.disk_usage.collect());
858 0 : mfs.extend(self.acceptor_term.collect());
859 0 : mfs.extend(self.written_wal_bytes.collect());
860 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
861 0 : mfs.extend(self.written_wal_seconds.collect());
862 0 : mfs.extend(self.flushed_wal_seconds.collect());
863 0 :
864 0 : // report time it took to collect all info
865 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
866 0 : self.collect_timeline_metrics.set(elapsed);
867 0 : mfs.extend(self.collect_timeline_metrics.collect());
868 0 :
869 0 : // report total number of timelines
870 0 : self.timelines_count.set(timelines_count as i64);
871 0 : mfs.extend(self.timelines_count.collect());
872 0 :
873 0 : self.active_timelines_count
874 0 : .set(active_timelines_count as i64);
875 0 : mfs.extend(self.active_timelines_count.collect());
876 0 :
877 0 : mfs
878 0 : }
879 : }
880 :
881 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
882 0 : let mut res = vec![];
883 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
884 :
885 0 : for tli in active_timelines {
886 0 : if let Some(info) = tli.info_for_metrics().await {
887 0 : res.push(info);
888 0 : }
889 : }
890 0 : res
891 0 : }
|