LCOV - code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 17.3 % 539 93
Test Date: 2025-07-16 12:29:03 Functions: 35.9 % 64 23

            Line data    Source code
       1              : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2              : 
       3              : use std::sync::{Arc, RwLock};
       4              : use std::time::{Instant, SystemTime};
       5              : 
       6              : use anyhow::Result;
       7              : use futures::Future;
       8              : use metrics::core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts};
       9              : use metrics::proto::MetricFamily;
      10              : use metrics::{
      11              :     DISK_FSYNC_SECONDS_BUCKETS, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
      12              :     IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, pow2_buckets,
      13              :     register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
      14              :     register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
      15              :     register_int_gauge_vec,
      16              : };
      17              : use once_cell::sync::Lazy;
      18              : use postgres_ffi::XLogSegNo;
      19              : use utils::id::TenantTimelineId;
      20              : use utils::lsn::Lsn;
      21              : use utils::pageserver_feedback::PageserverFeedback;
      22              : 
      23              : use crate::GlobalTimelines;
      24              : use crate::receive_wal::MSG_QUEUE_SIZE;
      25              : use crate::state::{TimelineMemState, TimelinePersistentState};
      26              : 
      27              : // Global metrics across all timelines.
      28            5 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      29            5 :     register_histogram!(
      30              :         "safekeeper_write_wal_bytes",
      31              :         "Bytes written to WAL in a single request",
      32            5 :         vec![
      33              :             1.0,
      34              :             10.0,
      35              :             100.0,
      36              :             1024.0,
      37              :             8192.0,
      38            5 :             128.0 * 1024.0,
      39            5 :             1024.0 * 1024.0,
      40            5 :             10.0 * 1024.0 * 1024.0
      41              :         ]
      42              :     )
      43            5 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      44            5 : });
      45            5 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      46            5 :     register_histogram!(
      47              :         "safekeeper_write_wal_seconds",
      48              :         "Seconds spent writing and syncing WAL to a disk in a single request",
      49            5 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      50              :     )
      51            5 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      52            5 : });
      53            5 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      54            5 :     register_histogram!(
      55              :         "safekeeper_flush_wal_seconds",
      56              :         "Seconds spent syncing WAL to a disk (excluding segment initialization)",
      57            5 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      58              :     )
      59            5 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      60            5 : });
      61              : /* BEGIN_HADRON */
      62              : // Counter of all ProposerAcceptorMessage requests received
      63           17 : pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
      64           17 :     register_int_counter_vec!(
      65              :         "safekeeper_proposer_acceptor_messages_total",
      66              :         "Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
      67           17 :         &["outcome"]
      68              :     )
      69           17 :     .expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
      70           17 : });
      71            0 : pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
      72            0 :     register_int_counter!(
      73              :         "safekeeper_wal_disk_io_errors",
      74              :         "Number of disk I/O errors when creating and flushing WALs and control files"
      75              :     )
      76            0 :     .expect("Failed to register safekeeper_wal_disk_io_errors counter")
      77            0 : });
      78            0 : pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
      79            0 :     register_int_counter!(
      80              :         "safekeeper_wal_storage_limit_errors",
      81              :         concat!(
      82              :             "Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
      83              :             "An increase in this metric indicates issues backing up or removing WALs."
      84              :         )
      85              :     )
      86            0 :     .expect("Failed to register safekeeper_wal_storage_limit_errors counter")
      87            0 : });
      88            0 : pub static SK_RECOVERY_PULL_TIMELINE_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
      89            0 :     register_int_counter!(
      90              :         "safekeeper_recovery_pull_timeline_errors",
      91              :         concat!(
      92              :             "Number of errors due to pull_timeline errors during SK lost disk recovery.",
      93              :             "An increase in this metric indicates pull timelines runs into error."
      94              :         )
      95              :     )
      96            0 :     .expect("Failed to register safekeeper_recovery_pull_timeline_errors counter")
      97            0 : });
      98            0 : pub static SK_RECOVERY_PULL_TIMELINE_OKS: Lazy<IntCounter> = Lazy::new(|| {
      99            0 :     register_int_counter!(
     100              :         "safekeeper_recovery_pull_timeline_oks",
     101              :         concat!(
     102              :             "Number of successful pull_timeline during SK lost disk recovery.",
     103              :             "An increase in this metric indicates pull timelines is successful."
     104              :         )
     105              :     )
     106            0 :     .expect("Failed to register safekeeper_recovery_pull_timeline_oks counter")
     107            0 : });
     108            0 : pub static SK_RECOVERY_PULL_TIMELINES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     109            0 :     register_histogram!(
     110              :         "safekeeper_recovery_pull_timelines_seconds",
     111              :         "Seconds to pull timelines",
     112            0 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     113              :     )
     114            0 :     .expect("Failed to register safekeeper_recovery_pull_timelines_seconds histogram")
     115            0 : });
     116            0 : pub static SK_RECOVERY_PULL_TIMELINE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
     117            0 :     register_histogram_vec!(
     118              :         "safekeeper_recovery_pull_timeline_seconds",
     119              :         "Seconds to pull timeline",
     120            0 :         &["tenant_id", "timeline_id"],
     121            0 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     122              :     )
     123            0 :     .expect("Failed to register safekeeper_recovery_pull_timeline_seconds histogram vec")
     124            0 : });
     125              : /* END_HADRON */
     126            7 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     127            7 :     register_histogram!(
     128              :         "safekeeper_persist_control_file_seconds",
     129              :         "Seconds to persist and sync control file",
     130            7 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     131              :     )
     132            7 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
     133            7 : });
     134            5 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
     135            5 :     register_histogram_vec!(
     136              :         "safekeeper_wal_storage_operation_seconds",
     137              :         "Seconds spent on WAL storage operations",
     138            5 :         &["operation"],
     139            5 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     140              :     )
     141            5 :     .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
     142            5 : });
     143           15 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
     144           15 :     register_histogram_vec!(
     145              :         "safekeeper_misc_operation_seconds",
     146              :         "Seconds spent on miscellaneous operations",
     147           15 :         &["operation"],
     148           15 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     149              :     )
     150           15 :     .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
     151           15 : });
     152            0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
     153            0 :     register_int_counter_vec!(
     154              :         "safekeeper_pg_io_bytes_total",
     155              :         "Bytes read from or written to any PostgreSQL connection",
     156            0 :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
     157              :     )
     158            0 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
     159            0 : });
     160            0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
     161            0 :     register_int_counter!(
     162              :         "safekeeper_broker_pushed_updates_total",
     163              :         "Number of timeline updates pushed to the broker"
     164              :     )
     165            0 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
     166            0 : });
     167            0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
     168            0 :     register_int_counter_vec!(
     169              :         "safekeeper_broker_pulled_updates_total",
     170              :         "Number of timeline updates pulled and processed from the broker",
     171            0 :         &["result"]
     172              :     )
     173            0 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
     174            0 : });
     175            0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
     176            0 :     register_int_counter_pair_vec!(
     177              :         "safekeeper_pg_queries_received_total",
     178              :         "Number of queries received through pg protocol",
     179              :         "safekeeper_pg_queries_finished_total",
     180              :         "Number of queries finished through pg protocol",
     181            0 :         &["query"]
     182              :     )
     183            0 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     184            0 : });
     185            0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     186            0 :     register_int_counter!(
     187              :         "safekeeper_removed_wal_segments_total",
     188              :         "Number of WAL segments removed from the disk"
     189              :     )
     190            0 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     191            0 : });
     192            0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     193            0 :     register_int_counter!(
     194              :         "safekeeper_backed_up_segments_total",
     195              :         "Number of WAL segments backed up to the S3"
     196              :     )
     197            0 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     198            0 : });
     199            0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     200            0 :     register_int_counter!(
     201              :         "safekeeper_backup_errors_total",
     202              :         "Number of errors during backup"
     203              :     )
     204            0 :     .expect("Failed to register safekeeper_backup_errors_total counter")
     205            0 : });
     206              : /* BEGIN_HADRON */
     207            0 : pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
     208            0 :     register_int_counter!(
     209              :         "safekeeper_backup_reelect_leader_total",
     210              :         "Number of times the backup leader was reelected"
     211              :     )
     212            0 :     .expect("Failed to register safekeeper_backup_reelect_leader_total counter")
     213            0 : });
     214              : /* END_HADRON */
     215            0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     216            0 :     register_histogram!(
     217              :         "safekeeper_broker_push_update_seconds",
     218              :         "Seconds to push all timeline updates to the broker",
     219            0 :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     220              :     )
     221            0 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     222            0 : });
     223              : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     224              :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     225              : ];
     226            0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     227            0 :     register_histogram!(
     228              :         "safekeeper_broker_iteration_timelines",
     229              :         "Count of timelines pushed to the broker in a single iteration",
     230            0 :         TIMELINES_COUNT_BUCKETS.to_vec()
     231              :     )
     232            0 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     233            0 : });
     234            0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
     235            0 :     register_int_counter!(
     236              :         "safekeeper_received_ps_feedbacks_total",
     237              :         "Number of pageserver feedbacks received"
     238              :     )
     239            0 :     .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
     240            0 : });
     241            0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
     242            0 :     register_int_counter_vec!(
     243              :         "safekeeper_partial_backup_uploads_total",
     244              :         "Number of partial backup uploads to the S3",
     245            0 :         &["result"]
     246              :     )
     247            0 :     .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
     248            0 : });
     249            0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
     250            0 :     register_int_counter!(
     251              :         "safekeeper_partial_backup_uploaded_bytes_total",
     252              :         "Number of bytes uploaded to the S3 during partial backup"
     253              :     )
     254            0 :     .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
     255            0 : });
     256            5 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
     257            5 :     register_int_counter!(
     258              :         "safekeeper_manager_iterations_total",
     259              :         "Number of iterations of the timeline manager task"
     260              :     )
     261            5 :     .expect("Failed to register safekeeper_manager_iterations_total counter")
     262            5 : });
     263            5 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
     264            5 :     register_int_counter!(
     265              :         "safekeeper_manager_active_changes_total",
     266              :         "Number of timeline active status changes in the timeline manager task"
     267              :     )
     268            5 :     .expect("Failed to register safekeeper_manager_active_changes_total counter")
     269            5 : });
     270            0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
     271            0 :     register_int_counter_pair!(
     272              :         "safekeeper_wal_backup_tasks_started_total",
     273              :         "Number of active WAL backup tasks",
     274              :         "safekeeper_wal_backup_tasks_finished_total",
     275              :         "Number of finished WAL backup tasks",
     276              :     )
     277            0 :     .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
     278            0 : });
     279            5 : pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
     280            5 :     register_int_gauge!(
     281              :         "safekeeper_wal_receivers",
     282              :         "Number of currently connected WAL receivers (i.e. connected computes)"
     283              :     )
     284            5 :     .expect("Failed to register safekeeper_wal_receivers")
     285            5 : });
     286            4 : pub static WAL_READERS: Lazy<IntGaugeVec> = Lazy::new(|| {
     287            4 :     register_int_gauge_vec!(
     288              :         "safekeeper_wal_readers",
     289              :         "Number of active WAL readers (may serve pageservers or other safekeepers)",
     290            4 :         &["kind", "target"]
     291              :     )
     292            4 :     .expect("Failed to register safekeeper_wal_receivers")
     293            4 : });
     294            5 : pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
     295              :     // Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
     296              :     // full queues respectively.
     297            5 :     let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
     298            5 :     buckets.insert(0, 0.0);
     299            5 :     buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
     300            5 :     assert!(buckets.len() <= 12, "too many histogram buckets");
     301              : 
     302            5 :     register_histogram!(
     303              :         "safekeeper_wal_receiver_queue_depth",
     304              :         "Number of queued messages per WAL receiver (sampled every 5 seconds)",
     305            5 :         buckets
     306              :     )
     307            5 :     .expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
     308            5 : });
     309            5 : pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
     310            5 :     register_int_gauge!(
     311              :         "safekeeper_wal_receiver_queue_depth_total",
     312              :         "Total number of queued messages across all WAL receivers",
     313              :     )
     314            5 :     .expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
     315            5 : });
     316              : // TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
     317              : // MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
     318            5 : pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
     319            5 :     register_int_gauge!(
     320              :         "safekeeper_wal_receiver_queue_size_total",
     321              :         "Total memory byte size of queued messages across all WAL receivers",
     322              :     )
     323            5 :     .expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
     324            5 : });
     325              : 
     326              : // Metrics collected on operations on the storage repository.
     327              : #[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]
     328              : #[strum(serialize_all = "kebab_case")]
     329              : pub(crate) enum EvictionEvent {
     330              :     Evict,
     331              :     Restore,
     332              : }
     333              : 
     334            0 : pub(crate) static EVICTION_EVENTS_STARTED: Lazy<IntCounterVec> = Lazy::new(|| {
     335            0 :     register_int_counter_vec!(
     336              :         "safekeeper_eviction_events_started_total",
     337              :         "Number of eviction state changes, incremented when they start",
     338            0 :         &["kind"]
     339              :     )
     340            0 :     .expect("Failed to register metric")
     341            0 : });
     342              : 
     343            0 : pub(crate) static EVICTION_EVENTS_COMPLETED: Lazy<IntCounterVec> = Lazy::new(|| {
     344            0 :     register_int_counter_vec!(
     345              :         "safekeeper_eviction_events_completed_total",
     346              :         "Number of eviction state changes, incremented when they complete",
     347            0 :         &["kind"]
     348              :     )
     349            0 :     .expect("Failed to register metric")
     350            0 : });
     351              : 
     352            0 : pub static NUM_EVICTED_TIMELINES: Lazy<IntGauge> = Lazy::new(|| {
     353            0 :     register_int_gauge!(
     354              :         "safekeeper_evicted_timelines",
     355              :         "Number of currently evicted timelines"
     356              :     )
     357            0 :     .expect("Failed to register metric")
     358            0 : });
     359              : 
     360              : pub const LABEL_UNKNOWN: &str = "unknown";
     361              : 
     362              : /// Labels for traffic metrics.
     363              : #[derive(Clone)]
     364              : struct ConnectionLabels {
     365              :     /// Availability zone of the connection origin.
     366              :     client_az: String,
     367              :     /// Availability zone of the current safekeeper.
     368              :     sk_az: String,
     369              :     /// Client application name.
     370              :     app_name: String,
     371              : }
     372              : 
     373              : impl ConnectionLabels {
     374            0 :     fn new() -> Self {
     375            0 :         Self {
     376            0 :             client_az: LABEL_UNKNOWN.to_string(),
     377            0 :             sk_az: LABEL_UNKNOWN.to_string(),
     378            0 :             app_name: LABEL_UNKNOWN.to_string(),
     379            0 :         }
     380            0 :     }
     381              : 
     382            0 :     fn build_metrics(
     383            0 :         &self,
     384            0 :     ) -> (
     385            0 :         GenericCounter<metrics::core::AtomicU64>,
     386            0 :         GenericCounter<metrics::core::AtomicU64>,
     387            0 :     ) {
     388            0 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     389            0 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     390            0 :             (client_az, sk_az) => {
     391            0 :                 if client_az == sk_az {
     392            0 :                     "true"
     393              :                 } else {
     394            0 :                     "false"
     395              :                 }
     396              :             }
     397              :         };
     398              : 
     399            0 :         let read = PG_IO_BYTES.with_label_values(&[
     400            0 :             &self.client_az,
     401            0 :             &self.sk_az,
     402            0 :             &self.app_name,
     403            0 :             "read",
     404            0 :             same_az,
     405            0 :         ]);
     406            0 :         let write = PG_IO_BYTES.with_label_values(&[
     407            0 :             &self.client_az,
     408            0 :             &self.sk_az,
     409            0 :             &self.app_name,
     410            0 :             "write",
     411            0 :             same_az,
     412            0 :         ]);
     413            0 :         (read, write)
     414            0 :     }
     415              : }
     416              : 
     417              : struct TrafficMetricsState {
     418              :     /// Labels for traffic metrics.
     419              :     labels: ConnectionLabels,
     420              :     /// Total bytes read from this connection.
     421              :     read: GenericCounter<metrics::core::AtomicU64>,
     422              :     /// Total bytes written to this connection.
     423              :     write: GenericCounter<metrics::core::AtomicU64>,
     424              : }
     425              : 
     426              : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     427              : #[derive(Clone)]
     428              : pub struct TrafficMetrics {
     429              :     state: Arc<RwLock<TrafficMetricsState>>,
     430              : }
     431              : 
     432              : impl Default for TrafficMetrics {
     433            0 :     fn default() -> Self {
     434            0 :         Self::new()
     435            0 :     }
     436              : }
     437              : 
     438              : impl TrafficMetrics {
     439            0 :     pub fn new() -> Self {
     440            0 :         let labels = ConnectionLabels::new();
     441            0 :         let (read, write) = labels.build_metrics();
     442            0 :         let state = TrafficMetricsState {
     443            0 :             labels,
     444            0 :             read,
     445            0 :             write,
     446            0 :         };
     447            0 :         Self {
     448            0 :             state: Arc::new(RwLock::new(state)),
     449            0 :         }
     450            0 :     }
     451              : 
     452            0 :     pub fn set_client_az(&self, value: &str) {
     453            0 :         let mut state = self.state.write().unwrap();
     454            0 :         state.labels.client_az = value.to_string();
     455            0 :         (state.read, state.write) = state.labels.build_metrics();
     456            0 :     }
     457              : 
     458            0 :     pub fn set_sk_az(&self, value: &str) {
     459            0 :         let mut state = self.state.write().unwrap();
     460            0 :         state.labels.sk_az = value.to_string();
     461            0 :         (state.read, state.write) = state.labels.build_metrics();
     462            0 :     }
     463              : 
     464            0 :     pub fn set_app_name(&self, value: &str) {
     465            0 :         let mut state = self.state.write().unwrap();
     466            0 :         state.labels.app_name = value.to_string();
     467            0 :         (state.read, state.write) = state.labels.build_metrics();
     468            0 :     }
     469              : 
     470            0 :     pub fn observe_read(&self, cnt: usize) {
     471            0 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     472            0 :     }
     473              : 
     474            0 :     pub fn observe_write(&self, cnt: usize) {
     475            0 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     476            0 :     }
     477              : }
     478              : 
     479              : /// Metrics for WalStorage in a single timeline.
     480              : #[derive(Clone, Default)]
     481              : pub struct WalStorageMetrics {
     482              :     /// How much bytes were written in total.
     483              :     write_wal_bytes: u64,
     484              :     /// How much time spent writing WAL to disk, waiting for write(2).
     485              :     write_wal_seconds: f64,
     486              :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     487              :     flush_wal_seconds: f64,
     488              : }
     489              : 
     490              : impl WalStorageMetrics {
     491          620 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     492          620 :         self.write_wal_bytes += bytes as u64;
     493          620 :         WRITE_WAL_BYTES.observe(bytes as f64);
     494          620 :     }
     495              : 
     496          620 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     497          620 :         self.write_wal_seconds += seconds;
     498          620 :         WRITE_WAL_SECONDS.observe(seconds);
     499          620 :     }
     500              : 
     501          625 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     502          625 :         self.flush_wal_seconds += seconds;
     503          625 :         FLUSH_WAL_SECONDS.observe(seconds);
     504          625 :     }
     505              : }
     506              : 
     507              : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     508         1245 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     509         1245 :     closure: impl Future<Output = Result<(), E>>,
     510         1245 : ) -> Result<f64> {
     511         1245 :     let start = std::time::Instant::now();
     512         1245 :     closure.await.map_err(|e| e.into())?;
     513         1245 :     Ok(start.elapsed().as_secs_f64())
     514         1245 : }
     515              : 
     516              : /// Metrics for a single timeline.
     517              : #[derive(Clone)]
     518              : pub struct FullTimelineInfo {
     519              :     pub ttid: TenantTimelineId,
     520              :     pub ps_feedback_count: u64,
     521              :     pub last_ps_feedback: PageserverFeedback,
     522              :     pub wal_backup_active: bool,
     523              :     pub timeline_is_active: bool,
     524              :     pub num_computes: u32,
     525              :     pub last_removed_segno: XLogSegNo,
     526              :     pub interpreted_wal_reader_tasks: usize,
     527              : 
     528              :     pub epoch_start_lsn: Lsn,
     529              :     pub mem_state: TimelineMemState,
     530              :     pub persisted_state: TimelinePersistentState,
     531              : 
     532              :     pub flush_lsn: Lsn,
     533              : 
     534              :     pub wal_storage: WalStorageMetrics,
     535              : }
     536              : 
     537              : /// Collects metrics for all active timelines.
     538              : pub struct TimelineCollector {
     539              :     global_timelines: Arc<GlobalTimelines>,
     540              :     descs: Vec<Desc>,
     541              :     commit_lsn: GenericGaugeVec<AtomicU64>,
     542              :     backup_lsn: GenericGaugeVec<AtomicU64>,
     543              :     flush_lsn: GenericGaugeVec<AtomicU64>,
     544              :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     545              :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     546              :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     547              :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     548              :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     549              :     ps_feedback_count: GenericGaugeVec<AtomicU64>,
     550              :     timeline_active: GenericGaugeVec<AtomicU64>,
     551              :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     552              :     connected_computes: IntGaugeVec,
     553              :     disk_usage: GenericGaugeVec<AtomicU64>,
     554              :     acceptor_term: GenericGaugeVec<AtomicU64>,
     555              :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     556              :     interpreted_wal_reader_tasks: GenericGaugeVec<AtomicU64>,
     557              :     written_wal_seconds: GaugeVec,
     558              :     flushed_wal_seconds: GaugeVec,
     559              :     collect_timeline_metrics: Gauge,
     560              :     timelines_count: IntGauge,
     561              :     active_timelines_count: IntGauge,
     562              : }
     563              : 
     564              : impl TimelineCollector {
     565            0 :     pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
     566            0 :         let mut descs = Vec::new();
     567              : 
     568            0 :         let commit_lsn = GenericGaugeVec::new(
     569            0 :             Opts::new(
     570              :                 "safekeeper_commit_lsn",
     571              :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     572              :             ),
     573            0 :             &["tenant_id", "timeline_id"],
     574              :         )
     575            0 :         .unwrap();
     576            0 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     577              : 
     578            0 :         let backup_lsn = GenericGaugeVec::new(
     579            0 :             Opts::new(
     580              :                 "safekeeper_backup_lsn",
     581              :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     582              :             ),
     583            0 :             &["tenant_id", "timeline_id"],
     584              :         )
     585            0 :         .unwrap();
     586            0 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     587              : 
     588            0 :         let flush_lsn = GenericGaugeVec::new(
     589            0 :             Opts::new(
     590              :                 "safekeeper_flush_lsn",
     591              :                 "Current flush_lsn, grouped by timeline",
     592              :             ),
     593            0 :             &["tenant_id", "timeline_id"],
     594              :         )
     595            0 :         .unwrap();
     596            0 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     597              : 
     598            0 :         let epoch_start_lsn = GenericGaugeVec::new(
     599            0 :             Opts::new(
     600              :                 "safekeeper_epoch_start_lsn",
     601              :                 "Point since which compute generates new WAL in the current consensus term",
     602              :             ),
     603            0 :             &["tenant_id", "timeline_id"],
     604              :         )
     605            0 :         .unwrap();
     606            0 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     607              : 
     608            0 :         let peer_horizon_lsn = GenericGaugeVec::new(
     609            0 :             Opts::new(
     610              :                 "safekeeper_peer_horizon_lsn",
     611              :                 "LSN of the most lagging safekeeper",
     612              :             ),
     613            0 :             &["tenant_id", "timeline_id"],
     614              :         )
     615            0 :         .unwrap();
     616            0 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     617              : 
     618            0 :         let remote_consistent_lsn = GenericGaugeVec::new(
     619            0 :             Opts::new(
     620              :                 "safekeeper_remote_consistent_lsn",
     621              :                 "LSN which is persisted to the remote storage in pageserver",
     622              :             ),
     623            0 :             &["tenant_id", "timeline_id"],
     624              :         )
     625            0 :         .unwrap();
     626            0 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     627              : 
     628            0 :         let ps_last_received_lsn = GenericGaugeVec::new(
     629            0 :             Opts::new(
     630              :                 "safekeeper_ps_last_received_lsn",
     631              :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     632              :             ),
     633            0 :             &["tenant_id", "timeline_id"],
     634              :         )
     635            0 :         .unwrap();
     636            0 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     637              : 
     638            0 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     639            0 :             Opts::new(
     640              :                 "safekeeper_feedback_last_time_seconds",
     641              :                 "Timestamp of the last feedback from the pageserver",
     642              :             ),
     643            0 :             &["tenant_id", "timeline_id"],
     644              :         )
     645            0 :         .unwrap();
     646            0 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     647              : 
     648            0 :         let ps_feedback_count = GenericGaugeVec::new(
     649            0 :             Opts::new(
     650              :                 "safekeeper_ps_feedback_count_total",
     651              :                 "Number of feedbacks received from the pageserver",
     652              :             ),
     653            0 :             &["tenant_id", "timeline_id"],
     654              :         )
     655            0 :         .unwrap();
     656              : 
     657            0 :         let timeline_active = GenericGaugeVec::new(
     658            0 :             Opts::new(
     659              :                 "safekeeper_timeline_active",
     660              :                 "Reports 1 for active timelines, 0 for inactive",
     661              :             ),
     662            0 :             &["tenant_id", "timeline_id"],
     663              :         )
     664            0 :         .unwrap();
     665            0 :         descs.extend(timeline_active.desc().into_iter().cloned());
     666              : 
     667            0 :         let wal_backup_active = GenericGaugeVec::new(
     668            0 :             Opts::new(
     669              :                 "safekeeper_wal_backup_active",
     670              :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     671              :             ),
     672            0 :             &["tenant_id", "timeline_id"],
     673              :         )
     674            0 :         .unwrap();
     675            0 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     676              : 
     677            0 :         let connected_computes = IntGaugeVec::new(
     678            0 :             Opts::new(
     679              :                 "safekeeper_connected_computes",
     680              :                 "Number of active compute connections",
     681              :             ),
     682            0 :             &["tenant_id", "timeline_id"],
     683              :         )
     684            0 :         .unwrap();
     685            0 :         descs.extend(connected_computes.desc().into_iter().cloned());
     686              : 
     687            0 :         let disk_usage = GenericGaugeVec::new(
     688            0 :             Opts::new(
     689              :                 "safekeeper_disk_usage_bytes",
     690              :                 "Estimated disk space used to store WAL segments",
     691              :             ),
     692            0 :             &["tenant_id", "timeline_id"],
     693              :         )
     694            0 :         .unwrap();
     695            0 :         descs.extend(disk_usage.desc().into_iter().cloned());
     696              : 
     697            0 :         let acceptor_term = GenericGaugeVec::new(
     698            0 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     699            0 :             &["tenant_id", "timeline_id"],
     700              :         )
     701            0 :         .unwrap();
     702            0 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     703              : 
     704            0 :         let written_wal_bytes = GenericGaugeVec::new(
     705            0 :             Opts::new(
     706              :                 "safekeeper_written_wal_bytes_total",
     707              :                 "Number of WAL bytes written to disk, grouped by timeline",
     708              :             ),
     709            0 :             &["tenant_id", "timeline_id"],
     710              :         )
     711            0 :         .unwrap();
     712            0 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     713              : 
     714            0 :         let written_wal_seconds = GaugeVec::new(
     715            0 :             Opts::new(
     716              :                 "safekeeper_written_wal_seconds_total",
     717              :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     718              :             ),
     719            0 :             &["tenant_id", "timeline_id"],
     720              :         )
     721            0 :         .unwrap();
     722            0 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     723              : 
     724            0 :         let flushed_wal_seconds = GaugeVec::new(
     725            0 :             Opts::new(
     726              :                 "safekeeper_flushed_wal_seconds_total",
     727              :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     728              :             ),
     729            0 :             &["tenant_id", "timeline_id"],
     730              :         )
     731            0 :         .unwrap();
     732            0 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     733              : 
     734            0 :         let collect_timeline_metrics = Gauge::new(
     735              :             "safekeeper_collect_timeline_metrics_seconds",
     736              :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     737              :         )
     738            0 :         .unwrap();
     739            0 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     740              : 
     741            0 :         let timelines_count = IntGauge::new(
     742              :             "safekeeper_timelines",
     743              :             "Total number of timelines loaded in-memory",
     744              :         )
     745            0 :         .unwrap();
     746            0 :         descs.extend(timelines_count.desc().into_iter().cloned());
     747              : 
     748            0 :         let active_timelines_count = IntGauge::new(
     749              :             "safekeeper_active_timelines",
     750              :             "Total number of active timelines",
     751              :         )
     752            0 :         .unwrap();
     753            0 :         descs.extend(active_timelines_count.desc().into_iter().cloned());
     754              : 
     755            0 :         let interpreted_wal_reader_tasks = GenericGaugeVec::new(
     756            0 :             Opts::new(
     757              :                 "safekeeper_interpreted_wal_reader_tasks",
     758              :                 "Number of active interpreted wal reader tasks, grouped by timeline",
     759              :             ),
     760            0 :             &["tenant_id", "timeline_id"],
     761              :         )
     762            0 :         .unwrap();
     763            0 :         descs.extend(interpreted_wal_reader_tasks.desc().into_iter().cloned());
     764              : 
     765            0 :         TimelineCollector {
     766            0 :             global_timelines,
     767            0 :             descs,
     768            0 :             commit_lsn,
     769            0 :             backup_lsn,
     770            0 :             flush_lsn,
     771            0 :             epoch_start_lsn,
     772            0 :             peer_horizon_lsn,
     773            0 :             remote_consistent_lsn,
     774            0 :             ps_last_received_lsn,
     775            0 :             feedback_last_time_seconds,
     776            0 :             ps_feedback_count,
     777            0 :             timeline_active,
     778            0 :             wal_backup_active,
     779            0 :             connected_computes,
     780            0 :             disk_usage,
     781            0 :             acceptor_term,
     782            0 :             written_wal_bytes,
     783            0 :             written_wal_seconds,
     784            0 :             flushed_wal_seconds,
     785            0 :             collect_timeline_metrics,
     786            0 :             timelines_count,
     787            0 :             active_timelines_count,
     788            0 :             interpreted_wal_reader_tasks,
     789            0 :         }
     790            0 :     }
     791              : }
     792              : 
     793              : impl Collector for TimelineCollector {
     794            0 :     fn desc(&self) -> Vec<&Desc> {
     795            0 :         self.descs.iter().collect()
     796            0 :     }
     797              : 
     798            0 :     fn collect(&self) -> Vec<MetricFamily> {
     799            0 :         let start_collecting = Instant::now();
     800              : 
     801              :         // reset all metrics to clean up inactive timelines
     802            0 :         self.commit_lsn.reset();
     803            0 :         self.backup_lsn.reset();
     804            0 :         self.flush_lsn.reset();
     805            0 :         self.epoch_start_lsn.reset();
     806            0 :         self.peer_horizon_lsn.reset();
     807            0 :         self.remote_consistent_lsn.reset();
     808            0 :         self.ps_last_received_lsn.reset();
     809            0 :         self.feedback_last_time_seconds.reset();
     810            0 :         self.ps_feedback_count.reset();
     811            0 :         self.timeline_active.reset();
     812            0 :         self.wal_backup_active.reset();
     813            0 :         self.connected_computes.reset();
     814            0 :         self.disk_usage.reset();
     815            0 :         self.acceptor_term.reset();
     816            0 :         self.written_wal_bytes.reset();
     817            0 :         self.interpreted_wal_reader_tasks.reset();
     818            0 :         self.written_wal_seconds.reset();
     819            0 :         self.flushed_wal_seconds.reset();
     820              : 
     821            0 :         let timelines_count = self.global_timelines.get_all().len();
     822            0 :         let mut active_timelines_count = 0;
     823              : 
     824              :         // Prometheus Collector is sync, and data is stored under async lock. To
     825              :         // bridge the gap with a crutch, collect data in spawned thread with
     826              :         // local tokio runtime.
     827            0 :         let global_timelines = self.global_timelines.clone();
     828            0 :         let infos = std::thread::spawn(|| {
     829            0 :             let rt = tokio::runtime::Builder::new_current_thread()
     830            0 :                 .build()
     831            0 :                 .expect("failed to create rt");
     832            0 :             rt.block_on(collect_timeline_metrics(global_timelines))
     833            0 :         })
     834            0 :         .join()
     835            0 :         .expect("collect_timeline_metrics thread panicked");
     836              : 
     837            0 :         for tli in &infos {
     838            0 :             let tenant_id = tli.ttid.tenant_id.to_string();
     839            0 :             let timeline_id = tli.ttid.timeline_id.to_string();
     840            0 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     841              : 
     842            0 :             if tli.timeline_is_active {
     843            0 :                 active_timelines_count += 1;
     844            0 :             }
     845              : 
     846            0 :             self.commit_lsn
     847            0 :                 .with_label_values(labels)
     848            0 :                 .set(tli.mem_state.commit_lsn.into());
     849            0 :             self.backup_lsn
     850            0 :                 .with_label_values(labels)
     851            0 :                 .set(tli.mem_state.backup_lsn.into());
     852            0 :             self.flush_lsn
     853            0 :                 .with_label_values(labels)
     854            0 :                 .set(tli.flush_lsn.into());
     855            0 :             self.epoch_start_lsn
     856            0 :                 .with_label_values(labels)
     857            0 :                 .set(tli.epoch_start_lsn.into());
     858            0 :             self.peer_horizon_lsn
     859            0 :                 .with_label_values(labels)
     860            0 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     861            0 :             self.remote_consistent_lsn
     862            0 :                 .with_label_values(labels)
     863            0 :                 .set(tli.mem_state.remote_consistent_lsn.into());
     864            0 :             self.timeline_active
     865            0 :                 .with_label_values(labels)
     866            0 :                 .set(tli.timeline_is_active as u64);
     867            0 :             self.wal_backup_active
     868            0 :                 .with_label_values(labels)
     869            0 :                 .set(tli.wal_backup_active as u64);
     870            0 :             self.connected_computes
     871            0 :                 .with_label_values(labels)
     872            0 :                 .set(tli.num_computes as i64);
     873            0 :             self.acceptor_term
     874            0 :                 .with_label_values(labels)
     875            0 :                 .set(tli.persisted_state.acceptor_state.term);
     876            0 :             self.written_wal_bytes
     877            0 :                 .with_label_values(labels)
     878            0 :                 .set(tli.wal_storage.write_wal_bytes);
     879            0 :             self.interpreted_wal_reader_tasks
     880            0 :                 .with_label_values(labels)
     881            0 :                 .set(tli.interpreted_wal_reader_tasks as u64);
     882            0 :             self.written_wal_seconds
     883            0 :                 .with_label_values(labels)
     884            0 :                 .set(tli.wal_storage.write_wal_seconds);
     885            0 :             self.flushed_wal_seconds
     886            0 :                 .with_label_values(labels)
     887            0 :                 .set(tli.wal_storage.flush_wal_seconds);
     888              : 
     889            0 :             self.ps_last_received_lsn
     890            0 :                 .with_label_values(labels)
     891            0 :                 .set(tli.last_ps_feedback.last_received_lsn.0);
     892            0 :             self.ps_feedback_count
     893            0 :                 .with_label_values(labels)
     894            0 :                 .set(tli.ps_feedback_count);
     895            0 :             if let Ok(unix_time) = tli
     896            0 :                 .last_ps_feedback
     897            0 :                 .replytime
     898            0 :                 .duration_since(SystemTime::UNIX_EPOCH)
     899            0 :             {
     900            0 :                 self.feedback_last_time_seconds
     901            0 :                     .with_label_values(labels)
     902            0 :                     .set(unix_time.as_secs());
     903            0 :             }
     904              : 
     905            0 :             if tli.last_removed_segno != 0 {
     906            0 :                 let segno_count = tli
     907            0 :                     .flush_lsn
     908            0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     909            0 :                     - tli.last_removed_segno;
     910            0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     911            0 :                 self.disk_usage
     912            0 :                     .with_label_values(labels)
     913            0 :                     .set(disk_usage_bytes);
     914            0 :             }
     915              :         }
     916              : 
     917              :         // collect MetricFamilys.
     918            0 :         let mut mfs = Vec::new();
     919            0 :         mfs.extend(self.commit_lsn.collect());
     920            0 :         mfs.extend(self.backup_lsn.collect());
     921            0 :         mfs.extend(self.flush_lsn.collect());
     922            0 :         mfs.extend(self.epoch_start_lsn.collect());
     923            0 :         mfs.extend(self.peer_horizon_lsn.collect());
     924            0 :         mfs.extend(self.remote_consistent_lsn.collect());
     925            0 :         mfs.extend(self.ps_last_received_lsn.collect());
     926            0 :         mfs.extend(self.feedback_last_time_seconds.collect());
     927            0 :         mfs.extend(self.ps_feedback_count.collect());
     928            0 :         mfs.extend(self.timeline_active.collect());
     929            0 :         mfs.extend(self.wal_backup_active.collect());
     930            0 :         mfs.extend(self.connected_computes.collect());
     931            0 :         mfs.extend(self.disk_usage.collect());
     932            0 :         mfs.extend(self.acceptor_term.collect());
     933            0 :         mfs.extend(self.written_wal_bytes.collect());
     934            0 :         mfs.extend(self.interpreted_wal_reader_tasks.collect());
     935            0 :         mfs.extend(self.written_wal_seconds.collect());
     936            0 :         mfs.extend(self.flushed_wal_seconds.collect());
     937              : 
     938              :         // report time it took to collect all info
     939            0 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     940            0 :         self.collect_timeline_metrics.set(elapsed);
     941            0 :         mfs.extend(self.collect_timeline_metrics.collect());
     942              : 
     943              :         // report total number of timelines
     944            0 :         self.timelines_count.set(timelines_count as i64);
     945            0 :         mfs.extend(self.timelines_count.collect());
     946              : 
     947            0 :         self.active_timelines_count
     948            0 :             .set(active_timelines_count as i64);
     949            0 :         mfs.extend(self.active_timelines_count.collect());
     950              : 
     951            0 :         mfs
     952            0 :     }
     953              : }
     954              : 
     955            0 : async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
     956            0 :     let mut res = vec![];
     957            0 :     let active_timelines = global_timelines.get_global_broker_active_set().get_all();
     958              : 
     959            0 :     for tli in active_timelines {
     960            0 :         if let Some(info) = tli.info_for_metrics().await {
     961            0 :             res.push(info);
     962            0 :         }
     963              :     }
     964            0 :     res
     965            0 : }
        

Generated by: LCOV version 2.1-beta