LCOV - differential code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.8 % 553 524 29 524
Current Date: 2023-10-19 02:04:12 Functions: 77.0 % 61 47 14 47
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2                 : 
       3                 : use std::{
       4                 :     sync::{Arc, RwLock},
       5                 :     time::{Instant, SystemTime},
       6                 : };
       7                 : 
       8                 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
       9                 : use anyhow::Result;
      10                 : use futures::Future;
      11                 : use metrics::{
      12                 :     core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
      13                 :     proto::MetricFamily,
      14                 :     register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
      15                 : };
      16                 : use once_cell::sync::Lazy;
      17                 : 
      18                 : use postgres_ffi::XLogSegNo;
      19                 : use utils::pageserver_feedback::PageserverFeedback;
      20                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      21                 : 
      22                 : use crate::{
      23                 :     safekeeper::{SafeKeeperState, SafekeeperMemState},
      24                 :     GlobalTimelines,
      25                 : };
      26                 : 
      27                 : // Global metrics across all timelines.
      28 CBC         435 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      29             435 :     register_histogram!(
      30             435 :         "safekeeper_write_wal_bytes",
      31             435 :         "Bytes written to WAL in a single request",
      32             435 :         vec![
      33             435 :             1.0,
      34             435 :             10.0,
      35             435 :             100.0,
      36             435 :             1024.0,
      37             435 :             8192.0,
      38             435 :             128.0 * 1024.0,
      39             435 :             1024.0 * 1024.0,
      40             435 :             10.0 * 1024.0 * 1024.0
      41             435 :         ]
      42             435 :     )
      43             435 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      44             435 : });
      45             435 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      46             435 :     register_histogram!(
      47             435 :         "safekeeper_write_wal_seconds",
      48             435 :         "Seconds spent writing and syncing WAL to a disk in a single request",
      49             435 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      50             435 :     )
      51             435 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      52             435 : });
      53 UBC           0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      54               0 :     register_histogram!(
      55               0 :         "safekeeper_flush_wal_seconds",
      56               0 :         "Seconds spent syncing WAL to a disk",
      57               0 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      58               0 :     )
      59               0 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      60               0 : });
      61 CBC         439 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      62             439 :     register_histogram!(
      63             439 :         "safekeeper_persist_control_file_seconds",
      64             439 :         "Seconds to persist and sync control file",
      65             439 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      66             439 :     )
      67             439 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
      68             439 : });
      69             440 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
      70             440 :     register_int_counter_vec!(
      71             440 :         "safekeeper_pg_io_bytes_total",
      72             440 :         "Bytes read from or written to any PostgreSQL connection",
      73             440 :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
      74             440 :     )
      75             440 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
      76             440 : });
      77             420 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
      78             420 :     register_int_counter!(
      79             420 :         "safekeeper_broker_pushed_updates_total",
      80             420 :         "Number of timeline updates pushed to the broker"
      81             420 :     )
      82             420 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
      83             420 : });
      84             500 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
      85             500 :     register_int_counter_vec!(
      86             500 :         "safekeeper_broker_pulled_updates_total",
      87             500 :         "Number of timeline updates pulled and processed from the broker",
      88             500 :         &["result"]
      89             500 :     )
      90             500 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
      91             500 : });
      92             440 : pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
      93             440 :     register_int_counter_vec!(
      94             440 :         "safekeeper_pg_queries_received_total",
      95             440 :         "Number of queries received through pg protocol",
      96             440 :         &["query"]
      97             440 :     )
      98             440 :     .expect("Failed to register safekeeper_pg_queries_received_total counter")
      99             440 : });
     100             434 : pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
     101             434 :     register_int_counter_vec!(
     102             434 :         "safekeeper_pg_queries_finished_total",
     103             434 :         "Number of queries finished through pg protocol",
     104             434 :         &["query"]
     105             434 :     )
     106             434 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     107             434 : });
     108              13 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     109              13 :     register_int_counter!(
     110              13 :         "safekeeper_removed_wal_segments_total",
     111              13 :         "Number of WAL segments removed from the disk"
     112              13 :     )
     113              13 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     114              13 : });
     115              15 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     116              15 :     register_int_counter!(
     117              15 :         "safekeeper_backed_up_segments_total",
     118              15 :         "Number of WAL segments backed up to the broker"
     119              15 :     )
     120              15 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     121              15 : });
     122               1 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     123               1 :     register_int_counter!(
     124               1 :         "safekeeper_backup_errors_total",
     125               1 :         "Number of errors during backup"
     126               1 :     )
     127               1 :     .expect("Failed to register safekeeper_backup_errors_total counter")
     128               1 : });
     129             500 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     130             500 :     register_histogram!(
     131             500 :         "safekeeper_broker_push_update_seconds",
     132             500 :         "Seconds to push all timeline updates to the broker",
     133             500 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
     134             500 :     )
     135             500 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     136             500 : });
     137                 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     138                 :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     139                 : ];
     140             500 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     141             500 :     register_histogram!(
     142             500 :         "safekeeper_broker_iteration_timelines",
     143             500 :         "Count of timelines pushed to the broker in a single iteration",
     144             500 :         TIMELINES_COUNT_BUCKETS.to_vec()
     145             500 :     )
     146             500 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     147             500 : });
     148                 : 
     149                 : pub const LABEL_UNKNOWN: &str = "unknown";
     150                 : 
     151                 : /// Labels for traffic metrics.
     152 UBC           0 : #[derive(Clone)]
     153                 : struct ConnectionLabels {
     154                 :     /// Availability zone of the connection origin.
     155                 :     client_az: String,
     156                 :     /// Availability zone of the current safekeeper.
     157                 :     sk_az: String,
     158                 :     /// Client application name.
     159                 :     app_name: String,
     160                 : }
     161                 : 
     162                 : impl ConnectionLabels {
     163 CBC        3561 :     fn new() -> Self {
     164            3561 :         Self {
     165            3561 :             client_az: LABEL_UNKNOWN.to_string(),
     166            3561 :             sk_az: LABEL_UNKNOWN.to_string(),
     167            3561 :             app_name: LABEL_UNKNOWN.to_string(),
     168            3561 :         }
     169            3561 :     }
     170                 : 
     171            7908 :     fn build_metrics(
     172            7908 :         &self,
     173            7908 :     ) -> (
     174            7908 :         GenericCounter<metrics::core::AtomicU64>,
     175            7908 :         GenericCounter<metrics::core::AtomicU64>,
     176            7908 :     ) {
     177            7908 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     178            7908 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     179               8 :             (client_az, sk_az) => {
     180               8 :                 if client_az == sk_az {
     181 UBC           0 :                     "true"
     182                 :                 } else {
     183 CBC           8 :                     "false"
     184                 :                 }
     185                 :             }
     186                 :         };
     187                 : 
     188            7908 :         let read = PG_IO_BYTES.with_label_values(&[
     189            7908 :             &self.client_az,
     190            7908 :             &self.sk_az,
     191            7908 :             &self.app_name,
     192            7908 :             "read",
     193            7908 :             same_az,
     194            7908 :         ]);
     195            7908 :         let write = PG_IO_BYTES.with_label_values(&[
     196            7908 :             &self.client_az,
     197            7908 :             &self.sk_az,
     198            7908 :             &self.app_name,
     199            7908 :             "write",
     200            7908 :             same_az,
     201            7908 :         ]);
     202            7908 :         (read, write)
     203            7908 :     }
     204                 : }
     205                 : 
     206                 : struct TrafficMetricsState {
     207                 :     /// Labels for traffic metrics.
     208                 :     labels: ConnectionLabels,
     209                 :     /// Total bytes read from this connection.
     210                 :     read: GenericCounter<metrics::core::AtomicU64>,
     211                 :     /// Total bytes written to this connection.
     212                 :     write: GenericCounter<metrics::core::AtomicU64>,
     213                 : }
     214                 : 
     215                 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     216            3561 : #[derive(Clone)]
     217                 : pub struct TrafficMetrics {
     218                 :     state: Arc<RwLock<TrafficMetricsState>>,
     219                 : }
     220                 : 
     221                 : impl Default for TrafficMetrics {
     222 UBC           0 :     fn default() -> Self {
     223               0 :         Self::new()
     224               0 :     }
     225                 : }
     226                 : 
     227                 : impl TrafficMetrics {
     228 CBC        3561 :     pub fn new() -> Self {
     229            3561 :         let labels = ConnectionLabels::new();
     230            3561 :         let (read, write) = labels.build_metrics();
     231            3561 :         let state = TrafficMetricsState {
     232            3561 :             labels,
     233            3561 :             read,
     234            3561 :             write,
     235            3561 :         };
     236            3561 :         Self {
     237            3561 :             state: Arc::new(RwLock::new(state)),
     238            3561 :         }
     239            3561 :     }
     240                 : 
     241               4 :     pub fn set_client_az(&self, value: &str) {
     242               4 :         let mut state = self.state.write().unwrap();
     243               4 :         state.labels.client_az = value.to_string();
     244               4 :         (state.read, state.write) = state.labels.build_metrics();
     245               4 :     }
     246                 : 
     247            3561 :     pub fn set_sk_az(&self, value: &str) {
     248            3561 :         let mut state = self.state.write().unwrap();
     249            3561 :         state.labels.sk_az = value.to_string();
     250            3561 :         (state.read, state.write) = state.labels.build_metrics();
     251            3561 :     }
     252                 : 
     253             782 :     pub fn set_app_name(&self, value: &str) {
     254             782 :         let mut state = self.state.write().unwrap();
     255             782 :         state.labels.app_name = value.to_string();
     256             782 :         (state.read, state.write) = state.labels.build_metrics();
     257             782 :     }
     258                 : 
     259         3539877 :     pub fn observe_read(&self, cnt: usize) {
     260         3539877 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     261         3539877 :     }
     262                 : 
     263         3265120 :     pub fn observe_write(&self, cnt: usize) {
     264         3265120 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     265         3265120 :     }
     266                 : }
     267                 : 
     268                 : /// Metrics for WalStorage in a single timeline.
     269             566 : #[derive(Clone, Default)]
     270                 : pub struct WalStorageMetrics {
     271                 :     /// How much bytes were written in total.
     272                 :     write_wal_bytes: u64,
     273                 :     /// How much time spent writing WAL to disk, waiting for write(2).
     274                 :     write_wal_seconds: f64,
     275                 :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     276                 :     flush_wal_seconds: f64,
     277                 : }
     278                 : 
     279                 : impl WalStorageMetrics {
     280         1597352 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     281         1597352 :         self.write_wal_bytes += bytes as u64;
     282         1597352 :         WRITE_WAL_BYTES.observe(bytes as f64);
     283         1597352 :     }
     284                 : 
     285         1597352 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     286         1597352 :         self.write_wal_seconds += seconds;
     287         1597352 :         WRITE_WAL_SECONDS.observe(seconds);
     288         1597352 :     }
     289                 : 
     290 UBC           0 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     291               0 :         self.flush_wal_seconds += seconds;
     292               0 :         FLUSH_WAL_SECONDS.observe(seconds);
     293               0 :     }
     294                 : }
     295                 : 
     296                 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     297 CBC     1597352 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     298         1597352 :     closure: impl Future<Output = Result<(), E>>,
     299         1597352 : ) -> Result<f64> {
     300         1597352 :     let start = std::time::Instant::now();
     301         3131744 :     closure.await.map_err(|e| e.into())?;
     302         1597352 :     Ok(start.elapsed().as_secs_f64())
     303         1597352 : }
     304                 : 
     305                 : /// Metrics for a single timeline.
     306 UBC           0 : #[derive(Clone)]
     307                 : pub struct FullTimelineInfo {
     308                 :     pub ttid: TenantTimelineId,
     309                 :     pub ps_feedback: PageserverFeedback,
     310                 :     pub wal_backup_active: bool,
     311                 :     pub timeline_is_active: bool,
     312                 :     pub num_computes: u32,
     313                 :     pub last_removed_segno: XLogSegNo,
     314                 : 
     315                 :     pub epoch_start_lsn: Lsn,
     316                 :     pub mem_state: SafekeeperMemState,
     317                 :     pub persisted_state: SafeKeeperState,
     318                 : 
     319                 :     pub flush_lsn: Lsn,
     320                 :     pub remote_consistent_lsn: Lsn,
     321                 : 
     322                 :     pub wal_storage: WalStorageMetrics,
     323                 : }
     324                 : 
     325                 : /// Collects metrics for all active timelines.
     326                 : pub struct TimelineCollector {
     327                 :     descs: Vec<Desc>,
     328                 :     commit_lsn: GenericGaugeVec<AtomicU64>,
     329                 :     backup_lsn: GenericGaugeVec<AtomicU64>,
     330                 :     flush_lsn: GenericGaugeVec<AtomicU64>,
     331                 :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     332                 :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     333                 :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     334                 :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     335                 :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     336                 :     timeline_active: GenericGaugeVec<AtomicU64>,
     337                 :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     338                 :     connected_computes: IntGaugeVec,
     339                 :     disk_usage: GenericGaugeVec<AtomicU64>,
     340                 :     acceptor_term: GenericGaugeVec<AtomicU64>,
     341                 :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     342                 :     written_wal_seconds: GaugeVec,
     343                 :     flushed_wal_seconds: GaugeVec,
     344                 :     collect_timeline_metrics: Gauge,
     345                 :     timelines_count: IntGauge,
     346                 : }
     347                 : 
     348                 : impl Default for TimelineCollector {
     349               0 :     fn default() -> Self {
     350               0 :         Self::new()
     351               0 :     }
     352                 : }
     353                 : 
     354                 : impl TimelineCollector {
     355 CBC         500 :     pub fn new() -> TimelineCollector {
     356             500 :         let mut descs = Vec::new();
     357             500 : 
     358             500 :         let commit_lsn = GenericGaugeVec::new(
     359             500 :             Opts::new(
     360             500 :                 "safekeeper_commit_lsn",
     361             500 :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     362             500 :             ),
     363             500 :             &["tenant_id", "timeline_id"],
     364             500 :         )
     365             500 :         .unwrap();
     366             500 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     367             500 : 
     368             500 :         let backup_lsn = GenericGaugeVec::new(
     369             500 :             Opts::new(
     370             500 :                 "safekeeper_backup_lsn",
     371             500 :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     372             500 :             ),
     373             500 :             &["tenant_id", "timeline_id"],
     374             500 :         )
     375             500 :         .unwrap();
     376             500 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     377             500 : 
     378             500 :         let flush_lsn = GenericGaugeVec::new(
     379             500 :             Opts::new(
     380             500 :                 "safekeeper_flush_lsn",
     381             500 :                 "Current flush_lsn, grouped by timeline",
     382             500 :             ),
     383             500 :             &["tenant_id", "timeline_id"],
     384             500 :         )
     385             500 :         .unwrap();
     386             500 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     387             500 : 
     388             500 :         let epoch_start_lsn = GenericGaugeVec::new(
     389             500 :             Opts::new(
     390             500 :                 "safekeeper_epoch_start_lsn",
     391             500 :                 "Point since which compute generates new WAL in the current consensus term",
     392             500 :             ),
     393             500 :             &["tenant_id", "timeline_id"],
     394             500 :         )
     395             500 :         .unwrap();
     396             500 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     397             500 : 
     398             500 :         let peer_horizon_lsn = GenericGaugeVec::new(
     399             500 :             Opts::new(
     400             500 :                 "safekeeper_peer_horizon_lsn",
     401             500 :                 "LSN of the most lagging safekeeper",
     402             500 :             ),
     403             500 :             &["tenant_id", "timeline_id"],
     404             500 :         )
     405             500 :         .unwrap();
     406             500 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     407             500 : 
     408             500 :         let remote_consistent_lsn = GenericGaugeVec::new(
     409             500 :             Opts::new(
     410             500 :                 "safekeeper_remote_consistent_lsn",
     411             500 :                 "LSN which is persisted to the remote storage in pageserver",
     412             500 :             ),
     413             500 :             &["tenant_id", "timeline_id"],
     414             500 :         )
     415             500 :         .unwrap();
     416             500 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     417             500 : 
     418             500 :         let ps_last_received_lsn = GenericGaugeVec::new(
     419             500 :             Opts::new(
     420             500 :                 "safekeeper_ps_last_received_lsn",
     421             500 :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     422             500 :             ),
     423             500 :             &["tenant_id", "timeline_id"],
     424             500 :         )
     425             500 :         .unwrap();
     426             500 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     427             500 : 
     428             500 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     429             500 :             Opts::new(
     430             500 :                 "safekeeper_feedback_last_time_seconds",
     431             500 :                 "Timestamp of the last feedback from the pageserver",
     432             500 :             ),
     433             500 :             &["tenant_id", "timeline_id"],
     434             500 :         )
     435             500 :         .unwrap();
     436             500 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     437             500 : 
     438             500 :         let timeline_active = GenericGaugeVec::new(
     439             500 :             Opts::new(
     440             500 :                 "safekeeper_timeline_active",
     441             500 :                 "Reports 1 for active timelines, 0 for inactive",
     442             500 :             ),
     443             500 :             &["tenant_id", "timeline_id"],
     444             500 :         )
     445             500 :         .unwrap();
     446             500 :         descs.extend(timeline_active.desc().into_iter().cloned());
     447             500 : 
     448             500 :         let wal_backup_active = GenericGaugeVec::new(
     449             500 :             Opts::new(
     450             500 :                 "safekeeper_wal_backup_active",
     451             500 :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     452             500 :             ),
     453             500 :             &["tenant_id", "timeline_id"],
     454             500 :         )
     455             500 :         .unwrap();
     456             500 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     457             500 : 
     458             500 :         let connected_computes = IntGaugeVec::new(
     459             500 :             Opts::new(
     460             500 :                 "safekeeper_connected_computes",
     461             500 :                 "Number of active compute connections",
     462             500 :             ),
     463             500 :             &["tenant_id", "timeline_id"],
     464             500 :         )
     465             500 :         .unwrap();
     466             500 :         descs.extend(connected_computes.desc().into_iter().cloned());
     467             500 : 
     468             500 :         let disk_usage = GenericGaugeVec::new(
     469             500 :             Opts::new(
     470             500 :                 "safekeeper_disk_usage_bytes",
     471             500 :                 "Estimated disk space used to store WAL segments",
     472             500 :             ),
     473             500 :             &["tenant_id", "timeline_id"],
     474             500 :         )
     475             500 :         .unwrap();
     476             500 :         descs.extend(disk_usage.desc().into_iter().cloned());
     477             500 : 
     478             500 :         let acceptor_term = GenericGaugeVec::new(
     479             500 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     480             500 :             &["tenant_id", "timeline_id"],
     481             500 :         )
     482             500 :         .unwrap();
     483             500 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     484             500 : 
     485             500 :         let written_wal_bytes = GenericGaugeVec::new(
     486             500 :             Opts::new(
     487             500 :                 "safekeeper_written_wal_bytes_total",
     488             500 :                 "Number of WAL bytes written to disk, grouped by timeline",
     489             500 :             ),
     490             500 :             &["tenant_id", "timeline_id"],
     491             500 :         )
     492             500 :         .unwrap();
     493             500 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     494             500 : 
     495             500 :         let written_wal_seconds = GaugeVec::new(
     496             500 :             Opts::new(
     497             500 :                 "safekeeper_written_wal_seconds_total",
     498             500 :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     499             500 :             ),
     500             500 :             &["tenant_id", "timeline_id"],
     501             500 :         )
     502             500 :         .unwrap();
     503             500 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     504             500 : 
     505             500 :         let flushed_wal_seconds = GaugeVec::new(
     506             500 :             Opts::new(
     507             500 :                 "safekeeper_flushed_wal_seconds_total",
     508             500 :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     509             500 :             ),
     510             500 :             &["tenant_id", "timeline_id"],
     511             500 :         )
     512             500 :         .unwrap();
     513             500 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     514             500 : 
     515             500 :         let collect_timeline_metrics = Gauge::new(
     516             500 :             "safekeeper_collect_timeline_metrics_seconds",
     517             500 :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     518             500 :         )
     519             500 :         .unwrap();
     520             500 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     521             500 : 
     522             500 :         let timelines_count = IntGauge::new(
     523             500 :             "safekeeper_timelines",
     524             500 :             "Total number of timelines loaded in-memory",
     525             500 :         )
     526             500 :         .unwrap();
     527             500 :         descs.extend(timelines_count.desc().into_iter().cloned());
     528             500 : 
     529             500 :         TimelineCollector {
     530             500 :             descs,
     531             500 :             commit_lsn,
     532             500 :             backup_lsn,
     533             500 :             flush_lsn,
     534             500 :             epoch_start_lsn,
     535             500 :             peer_horizon_lsn,
     536             500 :             remote_consistent_lsn,
     537             500 :             ps_last_received_lsn,
     538             500 :             feedback_last_time_seconds,
     539             500 :             timeline_active,
     540             500 :             wal_backup_active,
     541             500 :             connected_computes,
     542             500 :             disk_usage,
     543             500 :             acceptor_term,
     544             500 :             written_wal_bytes,
     545             500 :             written_wal_seconds,
     546             500 :             flushed_wal_seconds,
     547             500 :             collect_timeline_metrics,
     548             500 :             timelines_count,
     549             500 :         }
     550             500 :     }
     551                 : }
     552                 : 
     553                 : impl Collector for TimelineCollector {
     554             500 :     fn desc(&self) -> Vec<&Desc> {
     555             500 :         self.descs.iter().collect()
     556             500 :     }
     557                 : 
     558              19 :     fn collect(&self) -> Vec<MetricFamily> {
     559              19 :         let start_collecting = Instant::now();
     560              19 : 
     561              19 :         // reset all metrics to clean up inactive timelines
     562              19 :         self.commit_lsn.reset();
     563              19 :         self.backup_lsn.reset();
     564              19 :         self.flush_lsn.reset();
     565              19 :         self.epoch_start_lsn.reset();
     566              19 :         self.peer_horizon_lsn.reset();
     567              19 :         self.remote_consistent_lsn.reset();
     568              19 :         self.ps_last_received_lsn.reset();
     569              19 :         self.feedback_last_time_seconds.reset();
     570              19 :         self.timeline_active.reset();
     571              19 :         self.wal_backup_active.reset();
     572              19 :         self.connected_computes.reset();
     573              19 :         self.disk_usage.reset();
     574              19 :         self.acceptor_term.reset();
     575              19 :         self.written_wal_bytes.reset();
     576              19 :         self.written_wal_seconds.reset();
     577              19 :         self.flushed_wal_seconds.reset();
     578              19 : 
     579              19 :         let timelines = GlobalTimelines::get_all();
     580              19 :         let timelines_count = timelines.len();
     581              19 : 
     582              19 :         // Prometheus Collector is sync, and data is stored under async lock. To
     583              19 :         // bridge the gap with a crutch, collect data in spawned thread with
     584              19 :         // local tokio runtime.
     585              19 :         let infos = std::thread::spawn(|| {
     586              19 :             let rt = tokio::runtime::Builder::new_current_thread()
     587              19 :                 .build()
     588              19 :                 .expect("failed to create rt");
     589              19 :             rt.block_on(collect_timeline_metrics())
     590              19 :         })
     591              19 :         .join()
     592              19 :         .expect("collect_timeline_metrics thread panicked");
     593                 : 
     594              70 :         for tli in &infos {
     595              51 :             let tenant_id = tli.ttid.tenant_id.to_string();
     596              51 :             let timeline_id = tli.ttid.timeline_id.to_string();
     597              51 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     598              51 : 
     599              51 :             self.commit_lsn
     600              51 :                 .with_label_values(labels)
     601              51 :                 .set(tli.mem_state.commit_lsn.into());
     602              51 :             self.backup_lsn
     603              51 :                 .with_label_values(labels)
     604              51 :                 .set(tli.mem_state.backup_lsn.into());
     605              51 :             self.flush_lsn
     606              51 :                 .with_label_values(labels)
     607              51 :                 .set(tli.flush_lsn.into());
     608              51 :             self.epoch_start_lsn
     609              51 :                 .with_label_values(labels)
     610              51 :                 .set(tli.epoch_start_lsn.into());
     611              51 :             self.peer_horizon_lsn
     612              51 :                 .with_label_values(labels)
     613              51 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     614              51 :             self.remote_consistent_lsn
     615              51 :                 .with_label_values(labels)
     616              51 :                 .set(tli.remote_consistent_lsn.into());
     617              51 :             self.timeline_active
     618              51 :                 .with_label_values(labels)
     619              51 :                 .set(tli.timeline_is_active as u64);
     620              51 :             self.wal_backup_active
     621              51 :                 .with_label_values(labels)
     622              51 :                 .set(tli.wal_backup_active as u64);
     623              51 :             self.connected_computes
     624              51 :                 .with_label_values(labels)
     625              51 :                 .set(tli.num_computes as i64);
     626              51 :             self.acceptor_term
     627              51 :                 .with_label_values(labels)
     628              51 :                 .set(tli.persisted_state.acceptor_state.term);
     629              51 :             self.written_wal_bytes
     630              51 :                 .with_label_values(labels)
     631              51 :                 .set(tli.wal_storage.write_wal_bytes);
     632              51 :             self.written_wal_seconds
     633              51 :                 .with_label_values(labels)
     634              51 :                 .set(tli.wal_storage.write_wal_seconds);
     635              51 :             self.flushed_wal_seconds
     636              51 :                 .with_label_values(labels)
     637              51 :                 .set(tli.wal_storage.flush_wal_seconds);
     638              51 : 
     639              51 :             self.ps_last_received_lsn
     640              51 :                 .with_label_values(labels)
     641              51 :                 .set(tli.ps_feedback.last_received_lsn.0);
     642              51 :             if let Ok(unix_time) = tli
     643              51 :                 .ps_feedback
     644              51 :                 .replytime
     645              51 :                 .duration_since(SystemTime::UNIX_EPOCH)
     646              51 :             {
     647              51 :                 self.feedback_last_time_seconds
     648              51 :                     .with_label_values(labels)
     649              51 :                     .set(unix_time.as_secs());
     650              51 :             }
     651                 : 
     652              51 :             if tli.last_removed_segno != 0 {
     653 UBC           0 :                 let segno_count = tli
     654               0 :                     .flush_lsn
     655               0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     656               0 :                     - tli.last_removed_segno;
     657               0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     658               0 :                 self.disk_usage
     659               0 :                     .with_label_values(labels)
     660               0 :                     .set(disk_usage_bytes);
     661 CBC          51 :             }
     662                 :         }
     663                 : 
     664                 :         // collect MetricFamilys.
     665              19 :         let mut mfs = Vec::new();
     666              19 :         mfs.extend(self.commit_lsn.collect());
     667              19 :         mfs.extend(self.backup_lsn.collect());
     668              19 :         mfs.extend(self.flush_lsn.collect());
     669              19 :         mfs.extend(self.epoch_start_lsn.collect());
     670              19 :         mfs.extend(self.peer_horizon_lsn.collect());
     671              19 :         mfs.extend(self.remote_consistent_lsn.collect());
     672              19 :         mfs.extend(self.ps_last_received_lsn.collect());
     673              19 :         mfs.extend(self.feedback_last_time_seconds.collect());
     674              19 :         mfs.extend(self.timeline_active.collect());
     675              19 :         mfs.extend(self.wal_backup_active.collect());
     676              19 :         mfs.extend(self.connected_computes.collect());
     677              19 :         mfs.extend(self.disk_usage.collect());
     678              19 :         mfs.extend(self.acceptor_term.collect());
     679              19 :         mfs.extend(self.written_wal_bytes.collect());
     680              19 :         mfs.extend(self.written_wal_seconds.collect());
     681              19 :         mfs.extend(self.flushed_wal_seconds.collect());
     682              19 : 
     683              19 :         // report time it took to collect all info
     684              19 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     685              19 :         self.collect_timeline_metrics.set(elapsed);
     686              19 :         mfs.extend(self.collect_timeline_metrics.collect());
     687              19 : 
     688              19 :         // report total number of timelines
     689              19 :         self.timelines_count.set(timelines_count as i64);
     690              19 :         mfs.extend(self.timelines_count.collect());
     691              19 : 
     692              19 :         mfs
     693              19 :     }
     694                 : }
     695                 : 
     696              19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
     697              19 :     let mut res = vec![];
     698              19 :     let timelines = GlobalTimelines::get_all();
     699                 : 
     700              70 :     for tli in timelines {
     701              51 :         if let Some(info) = tli.info_for_metrics().await {
     702              51 :             res.push(info);
     703              51 :         }
     704                 :     }
     705              19 :     res
     706              19 : }
        

Generated by: LCOV version 2.1-beta