Line data Source code
1 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
2 :
3 : use std::{
4 : sync::{Arc, RwLock},
5 : time::{Instant, SystemTime},
6 : };
7 :
8 : use anyhow::Result;
9 : use futures::Future;
10 : use metrics::{
11 : core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
12 : pow2_buckets,
13 : proto::MetricFamily,
14 : register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
15 : register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
16 : register_int_gauge_vec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
17 : IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS,
18 : };
19 : use once_cell::sync::Lazy;
20 : use postgres_ffi::XLogSegNo;
21 : use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback};
22 :
23 : use crate::{
24 : receive_wal::MSG_QUEUE_SIZE,
25 : state::{TimelineMemState, TimelinePersistentState},
26 : GlobalTimelines,
27 : };
28 :
29 : // Global metrics across all timelines.
30 3 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
31 3 : register_histogram!(
32 3 : "safekeeper_write_wal_bytes",
33 3 : "Bytes written to WAL in a single request",
34 3 : vec![
35 3 : 1.0,
36 3 : 10.0,
37 3 : 100.0,
38 3 : 1024.0,
39 3 : 8192.0,
40 3 : 128.0 * 1024.0,
41 3 : 1024.0 * 1024.0,
42 3 : 10.0 * 1024.0 * 1024.0
43 3 : ]
44 3 : )
45 3 : .expect("Failed to register safekeeper_write_wal_bytes histogram")
46 3 : });
47 3 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
48 3 : register_histogram!(
49 3 : "safekeeper_write_wal_seconds",
50 3 : "Seconds spent writing and syncing WAL to a disk in a single request",
51 3 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
52 3 : )
53 3 : .expect("Failed to register safekeeper_write_wal_seconds histogram")
54 3 : });
55 3 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
56 3 : register_histogram!(
57 3 : "safekeeper_flush_wal_seconds",
58 3 : "Seconds spent syncing WAL to a disk (excluding segment initialization)",
59 3 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
60 3 : )
61 3 : .expect("Failed to register safekeeper_flush_wal_seconds histogram")
62 3 : });
63 5 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
64 5 : register_histogram!(
65 5 : "safekeeper_persist_control_file_seconds",
66 5 : "Seconds to persist and sync control file",
67 5 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
68 5 : )
69 5 : .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
70 5 : });
71 3 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
72 3 : register_histogram_vec!(
73 3 : "safekeeper_wal_storage_operation_seconds",
74 3 : "Seconds spent on WAL storage operations",
75 3 : &["operation"],
76 3 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
77 3 : )
78 3 : .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
79 3 : });
80 13 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
81 13 : register_histogram_vec!(
82 13 : "safekeeper_misc_operation_seconds",
83 13 : "Seconds spent on miscellaneous operations",
84 13 : &["operation"],
85 13 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
86 13 : )
87 13 : .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
88 13 : });
89 0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
90 0 : register_int_counter_vec!(
91 0 : "safekeeper_pg_io_bytes_total",
92 0 : "Bytes read from or written to any PostgreSQL connection",
93 0 : &["client_az", "sk_az", "app_name", "dir", "same_az"]
94 0 : )
95 0 : .expect("Failed to register safekeeper_pg_io_bytes gauge")
96 0 : });
97 0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
98 0 : register_int_counter!(
99 0 : "safekeeper_broker_pushed_updates_total",
100 0 : "Number of timeline updates pushed to the broker"
101 0 : )
102 0 : .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
103 0 : });
104 0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
105 0 : register_int_counter_vec!(
106 0 : "safekeeper_broker_pulled_updates_total",
107 0 : "Number of timeline updates pulled and processed from the broker",
108 0 : &["result"]
109 0 : )
110 0 : .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
111 0 : });
112 0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
113 0 : register_int_counter_pair_vec!(
114 0 : "safekeeper_pg_queries_received_total",
115 0 : "Number of queries received through pg protocol",
116 0 : "safekeeper_pg_queries_finished_total",
117 0 : "Number of queries finished through pg protocol",
118 0 : &["query"]
119 0 : )
120 0 : .expect("Failed to register safekeeper_pg_queries_finished_total counter")
121 0 : });
122 0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
123 0 : register_int_counter!(
124 0 : "safekeeper_removed_wal_segments_total",
125 0 : "Number of WAL segments removed from the disk"
126 0 : )
127 0 : .expect("Failed to register safekeeper_removed_wal_segments_total counter")
128 0 : });
129 0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
130 0 : register_int_counter!(
131 0 : "safekeeper_backed_up_segments_total",
132 0 : "Number of WAL segments backed up to the S3"
133 0 : )
134 0 : .expect("Failed to register safekeeper_backed_up_segments_total counter")
135 0 : });
136 0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
137 0 : register_int_counter!(
138 0 : "safekeeper_backup_errors_total",
139 0 : "Number of errors during backup"
140 0 : )
141 0 : .expect("Failed to register safekeeper_backup_errors_total counter")
142 0 : });
143 0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
144 0 : register_histogram!(
145 0 : "safekeeper_broker_push_update_seconds",
146 0 : "Seconds to push all timeline updates to the broker",
147 0 : DISK_FSYNC_SECONDS_BUCKETS.to_vec()
148 0 : )
149 0 : .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
150 0 : });
151 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
152 : 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
153 : ];
154 0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
155 0 : register_histogram!(
156 0 : "safekeeper_broker_iteration_timelines",
157 0 : "Count of timelines pushed to the broker in a single iteration",
158 0 : TIMELINES_COUNT_BUCKETS.to_vec()
159 0 : )
160 0 : .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
161 0 : });
162 0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
163 0 : register_int_counter!(
164 0 : "safekeeper_received_ps_feedbacks_total",
165 0 : "Number of pageserver feedbacks received"
166 0 : )
167 0 : .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
168 0 : });
169 0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
170 0 : register_int_counter_vec!(
171 0 : "safekeeper_partial_backup_uploads_total",
172 0 : "Number of partial backup uploads to the S3",
173 0 : &["result"]
174 0 : )
175 0 : .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
176 0 : });
177 0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
178 0 : register_int_counter!(
179 0 : "safekeeper_partial_backup_uploaded_bytes_total",
180 0 : "Number of bytes uploaded to the S3 during partial backup"
181 0 : )
182 0 : .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
183 0 : });
184 3 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
185 3 : register_int_counter!(
186 3 : "safekeeper_manager_iterations_total",
187 3 : "Number of iterations of the timeline manager task"
188 3 : )
189 3 : .expect("Failed to register safekeeper_manager_iterations_total counter")
190 3 : });
191 3 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
192 3 : register_int_counter!(
193 3 : "safekeeper_manager_active_changes_total",
194 3 : "Number of timeline active status changes in the timeline manager task"
195 3 : )
196 3 : .expect("Failed to register safekeeper_manager_active_changes_total counter")
197 3 : });
198 0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
199 0 : register_int_counter_pair!(
200 0 : "safekeeper_wal_backup_tasks_started_total",
201 0 : "Number of active WAL backup tasks",
202 0 : "safekeeper_wal_backup_tasks_finished_total",
203 0 : "Number of finished WAL backup tasks",
204 0 : )
205 0 : .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
206 0 : });
207 3 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
208 3 : register_int_gauge!(
209 3 : "safekeeper_wal_receivers",
210 3 : "Number of currently connected WAL receivers (i.e. connected computes)"
211 3 : )
212 3 : .expect("Failed to register safekeeper_wal_receivers")
213 3 : });
214 2 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
215 2 : register_int_gauge_vec!(
216 2 : "safekeeper_wal_readers",
217 2 : "Number of active WAL readers (may serve pageservers or other safekeepers)",
218 2 : &["kind", "target"]
219 2 : )
220 2 : .expect("Failed to register safekeeper_wal_receivers")
221 2 : });
222 3 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
223 3 : // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
224 3 : // full queues respectively.
225 3 : let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
226 3 : buckets.insert(0, 0.0);
227 3 : buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
228 3 : assert!(buckets.len() <= 12, "too many histogram buckets");
229 :
230 3 : register_histogram!(
231 3 : "safekeeper_wal_receiver_queue_depth",
232 3 : "Number of queued messages per WAL receiver (sampled every 5 seconds)",
233 3 : buckets
234 3 : )
235 3 : .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
236 3 : });
237 3 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
238 3 : register_int_gauge!(
239 3 : "safekeeper_wal_receiver_queue_depth_total",
240 3 : "Total number of queued messages across all WAL receivers",
241 3 : )
242 3 : .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
243 3 : });
244 : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
245 : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
246 3 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
247 3 : register_int_gauge!(
248 3 : "safekeeper_wal_receiver_queue_size_total",
249 3 : "Total memory byte size of queued messages across all WAL receivers",
250 3 : )
251 3 : .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
252 3 : });
253 :
254 : // Metrics collected on operations on the storage repository.
255 0 : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
256 : #[strum(serialize_all = "kebab_case")]
257 : pub(crate) enum EvictionEvent {
258 : Evict,
259 : Restore,
260 : }
261 :
262 0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
263 0 : register_int_counter_vec!(
264 0 : "safekeeper_eviction_events_started_total",
265 0 : "Number of eviction state changes, incremented when they start",
266 0 : &["kind"]
267 0 : )
268 0 : .expect("Failed to register metric")
269 0 : });
270 :
271 0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
272 0 : register_int_counter_vec!(
273 0 : "safekeeper_eviction_events_completed_total",
274 0 : "Number of eviction state changes, incremented when they complete",
275 0 : &["kind"]
276 0 : )
277 0 : .expect("Failed to register metric")
278 0 : });
279 :
280 0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
281 0 : register_int_gauge!(
282 0 : "safekeeper_evicted_timelines",
283 0 : "Number of currently evicted timelines"
284 0 : )
285 0 : .expect("Failed to register metric")
286 0 : });
287 :
288 : pub const LABEL_UNKNOWN: &str = "unknown";
289 :
290 : /// Labels for traffic metrics.
291 : #[derive(Clone)]
292 : struct ConnectionLabels {
293 : /// Availability zone of the connection origin.
294 : client_az: String,
295 : /// Availability zone of the current safekeeper.
296 : sk_az: String,
297 : /// Client application name.
298 : app_name: String,
299 : }
300 :
301 : impl ConnectionLabels {
302 0 : fn new() -> Self {
303 0 : Self {
304 0 : client_az: LABEL_UNKNOWN.to_string(),
305 0 : sk_az: LABEL_UNKNOWN.to_string(),
306 0 : app_name: LABEL_UNKNOWN.to_string(),
307 0 : }
308 0 : }
309 :
310 0 : fn build_metrics(
311 0 : &self,
312 0 : ) -> (
313 0 : GenericCounter<metrics::core::AtomicU64>,
314 0 : GenericCounter<metrics::core::AtomicU64>,
315 0 : ) {
316 0 : let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
317 0 : (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
318 0 : (client_az, sk_az) => {
319 0 : if client_az == sk_az {
320 0 : "true"
321 : } else {
322 0 : "false"
323 : }
324 : }
325 : };
326 :
327 0 : let read = PG_IO_BYTES.with_label_values(&[
328 0 : &self.client_az,
329 0 : &self.sk_az,
330 0 : &self.app_name,
331 0 : "read",
332 0 : same_az,
333 0 : ]);
334 0 : let write = PG_IO_BYTES.with_label_values(&[
335 0 : &self.client_az,
336 0 : &self.sk_az,
337 0 : &self.app_name,
338 0 : "write",
339 0 : same_az,
340 0 : ]);
341 0 : (read, write)
342 0 : }
343 : }
344 :
345 : struct TrafficMetricsState {
346 : /// Labels for traffic metrics.
347 : labels: ConnectionLabels,
348 : /// Total bytes read from this connection.
349 : read: GenericCounter<metrics::core::AtomicU64>,
350 : /// Total bytes written to this connection.
351 : write: GenericCounter<metrics::core::AtomicU64>,
352 : }
353 :
354 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
355 : #[derive(Clone)]
356 : pub struct TrafficMetrics {
357 : state: Arc<RwLock<TrafficMetricsState>>,
358 : }
359 :
360 : impl Default for TrafficMetrics {
361 0 : fn default() -> Self {
362 0 : Self::new()
363 0 : }
364 : }
365 :
366 : impl TrafficMetrics {
367 0 : pub fn new() -> Self {
368 0 : let labels = ConnectionLabels::new();
369 0 : let (read, write) = labels.build_metrics();
370 0 : let state = TrafficMetricsState {
371 0 : labels,
372 0 : read,
373 0 : write,
374 0 : };
375 0 : Self {
376 0 : state: Arc::new(RwLock::new(state)),
377 0 : }
378 0 : }
379 :
380 0 : pub fn set_client_az(&self, value: &str) {
381 0 : let mut state = self.state.write().unwrap();
382 0 : state.labels.client_az = value.to_string();
383 0 : (state.read, state.write) = state.labels.build_metrics();
384 0 : }
385 :
386 0 : pub fn set_sk_az(&self, value: &str) {
387 0 : let mut state = self.state.write().unwrap();
388 0 : state.labels.sk_az = value.to_string();
389 0 : (state.read, state.write) = state.labels.build_metrics();
390 0 : }
391 :
392 0 : pub fn set_app_name(&self, value: &str) {
393 0 : let mut state = self.state.write().unwrap();
394 0 : state.labels.app_name = value.to_string();
395 0 : (state.read, state.write) = state.labels.build_metrics();
396 0 : }
397 :
398 0 : pub fn observe_read(&self, cnt: usize) {
399 0 : self.state.read().unwrap().read.inc_by(cnt as u64)
400 0 : }
401 :
402 0 : pub fn observe_write(&self, cnt: usize) {
403 0 : self.state.read().unwrap().write.inc_by(cnt as u64)
404 0 : }
405 : }
406 :
407 : /// Metrics for WalStorage in a single timeline.
408 : #[derive(Clone, Default)]
409 : pub struct WalStorageMetrics {
410 : /// How much bytes were written in total.
411 : write_wal_bytes: u64,
412 : /// How much time spent writing WAL to disk, waiting for write(2).
413 : write_wal_seconds: f64,
414 : /// How much time spent syncing WAL to disk, waiting for fsync(2).
415 : flush_wal_seconds: f64,
416 : }
417 :
418 : impl WalStorageMetrics {
419 600 : pub fn observe_write_bytes(&mut self, bytes: usize) {
420 600 : self.write_wal_bytes += bytes as u64;
421 600 : WRITE_WAL_BYTES.observe(bytes as f64);
422 600 : }
423 :
424 600 : pub fn observe_write_seconds(&mut self, seconds: f64) {
425 600 : self.write_wal_seconds += seconds;
426 600 : WRITE_WAL_SECONDS.observe(seconds);
427 600 : }
428 :
429 603 : pub fn observe_flush_seconds(&mut self, seconds: f64) {
430 603 : self.flush_wal_seconds += seconds;
431 603 : FLUSH_WAL_SECONDS.observe(seconds);
432 603 : }
433 : }
434 :
435 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
436 1203 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
437 1203 : closure: impl Future<Output = Result<(), E>>,
438 1203 : ) -> Result<f64> {
439 1203 : let start = std::time::Instant::now();
440 1203 : closure.await.map_err(|e| e.into())?;
441 1203 : Ok(start.elapsed().as_secs_f64())
442 1203 : }
443 :
444 : /// Metrics for a single timeline.
445 : #[derive(Clone)]
446 : pub struct FullTimelineInfo {
447 : pub ttid: TenantTimelineId,
448 : pub ps_feedback_count: u64,
449 : pub last_ps_feedback: PageserverFeedback,
450 : pub wal_backup_active: bool,
451 : pub timeline_is_active: bool,
452 : pub num_computes: u32,
453 : pub last_removed_segno: XLogSegNo,
454 : pub interpreted_wal_reader_tasks: usize,
455 :
456 : pub epoch_start_lsn: Lsn,
457 : pub mem_state: TimelineMemState,
458 : pub persisted_state: TimelinePersistentState,
459 :
460 : pub flush_lsn: Lsn,
461 :
462 : pub wal_storage: WalStorageMetrics,
463 : }
464 :
465 : /// Collects metrics for all active timelines.
466 : pub struct TimelineCollector {
467 : global_timelines: Arc<GlobalTimelines>,
468 : descs: Vec<Desc>,
469 : commit_lsn: GenericGaugeVec<AtomicU64>,
470 : backup_lsn: GenericGaugeVec<AtomicU64>,
471 : flush_lsn: GenericGaugeVec<AtomicU64>,
472 : epoch_start_lsn: GenericGaugeVec<AtomicU64>,
473 : peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
474 : remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
475 : ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
476 : feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
477 : ps_feedback_count: GenericGaugeVec<AtomicU64>,
478 : timeline_active: GenericGaugeVec<AtomicU64>,
479 : wal_backup_active: GenericGaugeVec<AtomicU64>,
480 : connected_computes: IntGaugeVec,
481 : disk_usage: GenericGaugeVec<AtomicU64>,
482 : acceptor_term: GenericGaugeVec<AtomicU64>,
483 : written_wal_bytes: GenericGaugeVec<AtomicU64>,
484 : interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
485 : written_wal_seconds: GaugeVec,
486 : flushed_wal_seconds: GaugeVec,
487 : collect_timeline_metrics: Gauge,
488 : timelines_count: IntGauge,
489 : active_timelines_count: IntGauge,
490 : }
491 :
492 : impl TimelineCollector {
493 0 : pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
494 0 : let mut descs = Vec::new();
495 0 :
496 0 : let commit_lsn = GenericGaugeVec::new(
497 0 : Opts::new(
498 0 : "safekeeper_commit_lsn",
499 0 : "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
500 0 : ),
501 0 : &["tenant_id", "timeline_id"],
502 0 : )
503 0 : .unwrap();
504 0 : descs.extend(commit_lsn.desc().into_iter().cloned());
505 0 :
506 0 : let backup_lsn = GenericGaugeVec::new(
507 0 : Opts::new(
508 0 : "safekeeper_backup_lsn",
509 0 : "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
510 0 : ),
511 0 : &["tenant_id", "timeline_id"],
512 0 : )
513 0 : .unwrap();
514 0 : descs.extend(backup_lsn.desc().into_iter().cloned());
515 0 :
516 0 : let flush_lsn = GenericGaugeVec::new(
517 0 : Opts::new(
518 0 : "safekeeper_flush_lsn",
519 0 : "Current flush_lsn, grouped by timeline",
520 0 : ),
521 0 : &["tenant_id", "timeline_id"],
522 0 : )
523 0 : .unwrap();
524 0 : descs.extend(flush_lsn.desc().into_iter().cloned());
525 0 :
526 0 : let epoch_start_lsn = GenericGaugeVec::new(
527 0 : Opts::new(
528 0 : "safekeeper_epoch_start_lsn",
529 0 : "Point since which compute generates new WAL in the current consensus term",
530 0 : ),
531 0 : &["tenant_id", "timeline_id"],
532 0 : )
533 0 : .unwrap();
534 0 : descs.extend(epoch_start_lsn.desc().into_iter().cloned());
535 0 :
536 0 : let peer_horizon_lsn = GenericGaugeVec::new(
537 0 : Opts::new(
538 0 : "safekeeper_peer_horizon_lsn",
539 0 : "LSN of the most lagging safekeeper",
540 0 : ),
541 0 : &["tenant_id", "timeline_id"],
542 0 : )
543 0 : .unwrap();
544 0 : descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
545 0 :
546 0 : let remote_consistent_lsn = GenericGaugeVec::new(
547 0 : Opts::new(
548 0 : "safekeeper_remote_consistent_lsn",
549 0 : "LSN which is persisted to the remote storage in pageserver",
550 0 : ),
551 0 : &["tenant_id", "timeline_id"],
552 0 : )
553 0 : .unwrap();
554 0 : descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
555 0 :
556 0 : let ps_last_received_lsn = GenericGaugeVec::new(
557 0 : Opts::new(
558 0 : "safekeeper_ps_last_received_lsn",
559 0 : "Last LSN received by the pageserver, acknowledged in the feedback",
560 0 : ),
561 0 : &["tenant_id", "timeline_id"],
562 0 : )
563 0 : .unwrap();
564 0 : descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
565 0 :
566 0 : let feedback_last_time_seconds = GenericGaugeVec::new(
567 0 : Opts::new(
568 0 : "safekeeper_feedback_last_time_seconds",
569 0 : "Timestamp of the last feedback from the pageserver",
570 0 : ),
571 0 : &["tenant_id", "timeline_id"],
572 0 : )
573 0 : .unwrap();
574 0 : descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
575 0 :
576 0 : let ps_feedback_count = GenericGaugeVec::new(
577 0 : Opts::new(
578 0 : "safekeeper_ps_feedback_count_total",
579 0 : "Number of feedbacks received from the pageserver",
580 0 : ),
581 0 : &["tenant_id", "timeline_id"],
582 0 : )
583 0 : .unwrap();
584 0 :
585 0 : let timeline_active = GenericGaugeVec::new(
586 0 : Opts::new(
587 0 : "safekeeper_timeline_active",
588 0 : "Reports 1 for active timelines, 0 for inactive",
589 0 : ),
590 0 : &["tenant_id", "timeline_id"],
591 0 : )
592 0 : .unwrap();
593 0 : descs.extend(timeline_active.desc().into_iter().cloned());
594 0 :
595 0 : let wal_backup_active = GenericGaugeVec::new(
596 0 : Opts::new(
597 0 : "safekeeper_wal_backup_active",
598 0 : "Reports 1 for timelines with active WAL backup, 0 otherwise",
599 0 : ),
600 0 : &["tenant_id", "timeline_id"],
601 0 : )
602 0 : .unwrap();
603 0 : descs.extend(wal_backup_active.desc().into_iter().cloned());
604 0 :
605 0 : let connected_computes = IntGaugeVec::new(
606 0 : Opts::new(
607 0 : "safekeeper_connected_computes",
608 0 : "Number of active compute connections",
609 0 : ),
610 0 : &["tenant_id", "timeline_id"],
611 0 : )
612 0 : .unwrap();
613 0 : descs.extend(connected_computes.desc().into_iter().cloned());
614 0 :
615 0 : let disk_usage = GenericGaugeVec::new(
616 0 : Opts::new(
617 0 : "safekeeper_disk_usage_bytes",
618 0 : "Estimated disk space used to store WAL segments",
619 0 : ),
620 0 : &["tenant_id", "timeline_id"],
621 0 : )
622 0 : .unwrap();
623 0 : descs.extend(disk_usage.desc().into_iter().cloned());
624 0 :
625 0 : let acceptor_term = GenericGaugeVec::new(
626 0 : Opts::new("safekeeper_acceptor_term", "Current consensus term"),
627 0 : &["tenant_id", "timeline_id"],
628 0 : )
629 0 : .unwrap();
630 0 : descs.extend(acceptor_term.desc().into_iter().cloned());
631 0 :
632 0 : let written_wal_bytes = GenericGaugeVec::new(
633 0 : Opts::new(
634 0 : "safekeeper_written_wal_bytes_total",
635 0 : "Number of WAL bytes written to disk, grouped by timeline",
636 0 : ),
637 0 : &["tenant_id", "timeline_id"],
638 0 : )
639 0 : .unwrap();
640 0 : descs.extend(written_wal_bytes.desc().into_iter().cloned());
641 0 :
642 0 : let written_wal_seconds = GaugeVec::new(
643 0 : Opts::new(
644 0 : "safekeeper_written_wal_seconds_total",
645 0 : "Total time spent in write(2) writing WAL to disk, grouped by timeline",
646 0 : ),
647 0 : &["tenant_id", "timeline_id"],
648 0 : )
649 0 : .unwrap();
650 0 : descs.extend(written_wal_seconds.desc().into_iter().cloned());
651 0 :
652 0 : let flushed_wal_seconds = GaugeVec::new(
653 0 : Opts::new(
654 0 : "safekeeper_flushed_wal_seconds_total",
655 0 : "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
656 0 : ),
657 0 : &["tenant_id", "timeline_id"],
658 0 : )
659 0 : .unwrap();
660 0 : descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
661 0 :
662 0 : let collect_timeline_metrics = Gauge::new(
663 0 : "safekeeper_collect_timeline_metrics_seconds",
664 0 : "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
665 0 : )
666 0 : .unwrap();
667 0 : descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
668 0 :
669 0 : let timelines_count = IntGauge::new(
670 0 : "safekeeper_timelines",
671 0 : "Total number of timelines loaded in-memory",
672 0 : )
673 0 : .unwrap();
674 0 : descs.extend(timelines_count.desc().into_iter().cloned());
675 0 :
676 0 : let active_timelines_count = IntGauge::new(
677 0 : "safekeeper_active_timelines",
678 0 : "Total number of active timelines",
679 0 : )
680 0 : .unwrap();
681 0 : descs.extend(active_timelines_count.desc().into_iter().cloned());
682 0 :
683 0 : let interpreted_wal_reader_tasks = GenericGaugeVec::new(
684 0 : Opts::new(
685 0 : "safekeeper_interpreted_wal_reader_tasks",
686 0 : "Number of active interpreted wal reader tasks, grouped by timeline",
687 0 : ),
688 0 : &["tenant_id", "timeline_id"],
689 0 : )
690 0 : .unwrap();
691 0 : descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
692 0 :
693 0 : TimelineCollector {
694 0 : global_timelines,
695 0 : descs,
696 0 : commit_lsn,
697 0 : backup_lsn,
698 0 : flush_lsn,
699 0 : epoch_start_lsn,
700 0 : peer_horizon_lsn,
701 0 : remote_consistent_lsn,
702 0 : ps_last_received_lsn,
703 0 : feedback_last_time_seconds,
704 0 : ps_feedback_count,
705 0 : timeline_active,
706 0 : wal_backup_active,
707 0 : connected_computes,
708 0 : disk_usage,
709 0 : acceptor_term,
710 0 : written_wal_bytes,
711 0 : written_wal_seconds,
712 0 : flushed_wal_seconds,
713 0 : collect_timeline_metrics,
714 0 : timelines_count,
715 0 : active_timelines_count,
716 0 : interpreted_wal_reader_tasks,
717 0 : }
718 0 : }
719 : }
720 :
721 : impl Collector for TimelineCollector {
722 0 : fn desc(&self) -> Vec<&Desc> {
723 0 : self.descs.iter().collect()
724 0 : }
725 :
726 0 : fn collect(&self) -> Vec<MetricFamily> {
727 0 : let start_collecting = Instant::now();
728 0 :
729 0 : // reset all metrics to clean up inactive timelines
730 0 : self.commit_lsn.reset();
731 0 : self.backup_lsn.reset();
732 0 : self.flush_lsn.reset();
733 0 : self.epoch_start_lsn.reset();
734 0 : self.peer_horizon_lsn.reset();
735 0 : self.remote_consistent_lsn.reset();
736 0 : self.ps_last_received_lsn.reset();
737 0 : self.feedback_last_time_seconds.reset();
738 0 : self.ps_feedback_count.reset();
739 0 : self.timeline_active.reset();
740 0 : self.wal_backup_active.reset();
741 0 : self.connected_computes.reset();
742 0 : self.disk_usage.reset();
743 0 : self.acceptor_term.reset();
744 0 : self.written_wal_bytes.reset();
745 0 : self.interpreted_wal_reader_tasks.reset();
746 0 : self.written_wal_seconds.reset();
747 0 : self.flushed_wal_seconds.reset();
748 0 :
749 0 : let timelines_count = self.global_timelines.get_all().len();
750 0 : let mut active_timelines_count = 0;
751 0 :
752 0 : // Prometheus Collector is sync, and data is stored under async lock. To
753 0 : // bridge the gap with a crutch, collect data in spawned thread with
754 0 : // local tokio runtime.
755 0 : let global_timelines = self.global_timelines.clone();
756 0 : let infos = std::thread::spawn(|| {
757 0 : let rt = tokio::runtime::Builder::new_current_thread()
758 0 : .build()
759 0 : .expect("failed to create rt");
760 0 : rt.block_on(collect_timeline_metrics(global_timelines))
761 0 : })
762 0 : .join()
763 0 : .expect("collect_timeline_metrics thread panicked");
764 :
765 0 : for tli in &infos {
766 0 : let tenant_id = tli.ttid.tenant_id.to_string();
767 0 : let timeline_id = tli.ttid.timeline_id.to_string();
768 0 : let labels = &[tenant_id.as_str(), timeline_id.as_str()];
769 0 :
770 0 : if tli.timeline_is_active {
771 0 : active_timelines_count += 1;
772 0 : }
773 :
774 0 : self.commit_lsn
775 0 : .with_label_values(labels)
776 0 : .set(tli.mem_state.commit_lsn.into());
777 0 : self.backup_lsn
778 0 : .with_label_values(labels)
779 0 : .set(tli.mem_state.backup_lsn.into());
780 0 : self.flush_lsn
781 0 : .with_label_values(labels)
782 0 : .set(tli.flush_lsn.into());
783 0 : self.epoch_start_lsn
784 0 : .with_label_values(labels)
785 0 : .set(tli.epoch_start_lsn.into());
786 0 : self.peer_horizon_lsn
787 0 : .with_label_values(labels)
788 0 : .set(tli.mem_state.peer_horizon_lsn.into());
789 0 : self.remote_consistent_lsn
790 0 : .with_label_values(labels)
791 0 : .set(tli.mem_state.remote_consistent_lsn.into());
792 0 : self.timeline_active
793 0 : .with_label_values(labels)
794 0 : .set(tli.timeline_is_active as u64);
795 0 : self.wal_backup_active
796 0 : .with_label_values(labels)
797 0 : .set(tli.wal_backup_active as u64);
798 0 : self.connected_computes
799 0 : .with_label_values(labels)
800 0 : .set(tli.num_computes as i64);
801 0 : self.acceptor_term
802 0 : .with_label_values(labels)
803 0 : .set(tli.persisted_state.acceptor_state.term);
804 0 : self.written_wal_bytes
805 0 : .with_label_values(labels)
806 0 : .set(tli.wal_storage.write_wal_bytes);
807 0 : self.interpreted_wal_reader_tasks
808 0 : .with_label_values(labels)
809 0 : .set(tli.interpreted_wal_reader_tasks as u64);
810 0 : self.written_wal_seconds
811 0 : .with_label_values(labels)
812 0 : .set(tli.wal_storage.write_wal_seconds);
813 0 : self.flushed_wal_seconds
814 0 : .with_label_values(labels)
815 0 : .set(tli.wal_storage.flush_wal_seconds);
816 0 :
817 0 : self.ps_last_received_lsn
818 0 : .with_label_values(labels)
819 0 : .set(tli.last_ps_feedback.last_received_lsn.0);
820 0 : self.ps_feedback_count
821 0 : .with_label_values(labels)
822 0 : .set(tli.ps_feedback_count);
823 0 : if let Ok(unix_time) = tli
824 0 : .last_ps_feedback
825 0 : .replytime
826 0 : .duration_since(SystemTime::UNIX_EPOCH)
827 0 : {
828 0 : self.feedback_last_time_seconds
829 0 : .with_label_values(labels)
830 0 : .set(unix_time.as_secs());
831 0 : }
832 :
833 0 : if tli.last_removed_segno != 0 {
834 0 : let segno_count = tli
835 0 : .flush_lsn
836 0 : .segment_number(tli.persisted_state.server.wal_seg_size as usize)
837 0 : - tli.last_removed_segno;
838 0 : let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
839 0 : self.disk_usage
840 0 : .with_label_values(labels)
841 0 : .set(disk_usage_bytes);
842 0 : }
843 : }
844 :
845 : // collect MetricFamilys.
846 0 : let mut mfs = Vec::new();
847 0 : mfs.extend(self.commit_lsn.collect());
848 0 : mfs.extend(self.backup_lsn.collect());
849 0 : mfs.extend(self.flush_lsn.collect());
850 0 : mfs.extend(self.epoch_start_lsn.collect());
851 0 : mfs.extend(self.peer_horizon_lsn.collect());
852 0 : mfs.extend(self.remote_consistent_lsn.collect());
853 0 : mfs.extend(self.ps_last_received_lsn.collect());
854 0 : mfs.extend(self.feedback_last_time_seconds.collect());
855 0 : mfs.extend(self.ps_feedback_count.collect());
856 0 : mfs.extend(self.timeline_active.collect());
857 0 : mfs.extend(self.wal_backup_active.collect());
858 0 : mfs.extend(self.connected_computes.collect());
859 0 : mfs.extend(self.disk_usage.collect());
860 0 : mfs.extend(self.acceptor_term.collect());
861 0 : mfs.extend(self.written_wal_bytes.collect());
862 0 : mfs.extend(self.interpreted_wal_reader_tasks.collect());
863 0 : mfs.extend(self.written_wal_seconds.collect());
864 0 : mfs.extend(self.flushed_wal_seconds.collect());
865 0 :
866 0 : // report time it took to collect all info
867 0 : let elapsed = start_collecting.elapsed().as_secs_f64();
868 0 : self.collect_timeline_metrics.set(elapsed);
869 0 : mfs.extend(self.collect_timeline_metrics.collect());
870 0 :
871 0 : // report total number of timelines
872 0 : self.timelines_count.set(timelines_count as i64);
873 0 : mfs.extend(self.timelines_count.collect());
874 0 :
875 0 : self.active_timelines_count
876 0 : .set(active_timelines_count as i64);
877 0 : mfs.extend(self.active_timelines_count.collect());
878 0 :
879 0 : mfs
880 0 : }
881 : }
882 :
883 0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
884 0 : let mut res = vec![];
885 0 : let active_timelines = global_timelines.get_global_broker_active_set().get_all();
886 :
887 0 : for tli in active_timelines {
888 0 : if let Some(info) = tli.info_for_metrics().await {
889 0 : res.push(info);
890 0 : }
891 : }
892 0 : res
893 0 : }
|