LCOV - differential code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 93.4 % 547 511 7 29 511
Current Date: 2024-01-09 02:06:09 Functions: 73.3 % 60 44 2 14 44
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2                 : 
       3                 : use std::{
       4                 :     sync::{Arc, RwLock},
       5                 :     time::{Instant, SystemTime},
       6                 : };
       7                 : 
       8                 : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
       9                 : use anyhow::Result;
      10                 : use futures::Future;
      11                 : use metrics::{
      12                 :     core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
      13                 :     proto::MetricFamily,
      14                 :     register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
      15                 :     IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
      16                 : };
      17                 : use once_cell::sync::Lazy;
      18                 : 
      19                 : use postgres_ffi::XLogSegNo;
      20                 : use utils::pageserver_feedback::PageserverFeedback;
      21                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      22                 : 
      23                 : use crate::{
      24                 :     safekeeper::{SafeKeeperState, SafekeeperMemState},
      25                 :     GlobalTimelines,
      26                 : };
      27                 : 
      28                 : // Global metrics across all timelines.
      29 CBC         411 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      30             411 :     register_histogram!(
      31             411 :         "safekeeper_write_wal_bytes",
      32             411 :         "Bytes written to WAL in a single request",
      33             411 :         vec![
      34             411 :             1.0,
      35             411 :             10.0,
      36             411 :             100.0,
      37             411 :             1024.0,
      38             411 :             8192.0,
      39             411 :             128.0 * 1024.0,
      40             411 :             1024.0 * 1024.0,
      41             411 :             10.0 * 1024.0 * 1024.0
      42             411 :         ]
      43             411 :     )
      44             411 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      45             411 : });
      46             411 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      47             411 :     register_histogram!(
      48             411 :         "safekeeper_write_wal_seconds",
      49             411 :         "Seconds spent writing and syncing WAL to a disk in a single request",
      50             411 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      51             411 :     )
      52             411 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      53             411 : });
      54 UBC           0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      55               0 :     register_histogram!(
      56               0 :         "safekeeper_flush_wal_seconds",
      57               0 :         "Seconds spent syncing WAL to a disk",
      58               0 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      59               0 :     )
      60               0 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      61               0 : });
      62 CBC         416 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      63             416 :     register_histogram!(
      64             416 :         "safekeeper_persist_control_file_seconds",
      65             416 :         "Seconds to persist and sync control file",
      66             416 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      67             416 :     )
      68             416 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
      69             416 : });
      70             418 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
      71             418 :     register_int_counter_vec!(
      72             418 :         "safekeeper_pg_io_bytes_total",
      73             418 :         "Bytes read from or written to any PostgreSQL connection",
      74             418 :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
      75             418 :     )
      76             418 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
      77             418 : });
      78             407 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
      79             407 :     register_int_counter!(
      80             407 :         "safekeeper_broker_pushed_updates_total",
      81             407 :         "Number of timeline updates pushed to the broker"
      82             407 :     )
      83             407 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
      84             407 : });
      85             485 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
      86             485 :     register_int_counter_vec!(
      87             485 :         "safekeeper_broker_pulled_updates_total",
      88             485 :         "Number of timeline updates pulled and processed from the broker",
      89             485 :         &["result"]
      90             485 :     )
      91             485 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
      92             485 : });
      93             418 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
      94             836 :     register_int_counter_pair_vec!(
      95             836 :         "safekeeper_pg_queries_received_total",
      96             836 :         "Number of queries received through pg protocol",
      97             836 :         "safekeeper_pg_queries_finished_total",
      98             836 :         "Number of queries finished through pg protocol",
      99             836 :         &["query"]
     100             836 :     )
     101             418 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     102             418 : });
     103               8 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     104               8 :     register_int_counter!(
     105               8 :         "safekeeper_removed_wal_segments_total",
     106               8 :         "Number of WAL segments removed from the disk"
     107               8 :     )
     108               8 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     109               8 : });
     110               6 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     111               6 :     register_int_counter!(
     112               6 :         "safekeeper_backed_up_segments_total",
     113               6 :         "Number of WAL segments backed up to the broker"
     114               6 :     )
     115               6 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     116               6 : });
     117 LBC         (2) : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     118             (2) :     register_int_counter!(
     119             (2) :         "safekeeper_backup_errors_total",
     120             (2) :         "Number of errors during backup"
     121             (2) :     )
     122             (2) :     .expect("Failed to register safekeeper_backup_errors_total counter")
     123             (2) : });
     124 CBC         485 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     125             485 :     register_histogram!(
     126             485 :         "safekeeper_broker_push_update_seconds",
     127             485 :         "Seconds to push all timeline updates to the broker",
     128             485 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
     129             485 :     )
     130             485 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     131             485 : });
     132                 : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     133                 :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     134                 : ];
     135             485 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     136             485 :     register_histogram!(
     137             485 :         "safekeeper_broker_iteration_timelines",
     138             485 :         "Count of timelines pushed to the broker in a single iteration",
     139             485 :         TIMELINES_COUNT_BUCKETS.to_vec()
     140             485 :     )
     141             485 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     142             485 : });
     143                 : 
     144                 : pub const LABEL_UNKNOWN: &str = "unknown";
     145                 : 
     146                 : /// Labels for traffic metrics.
     147 UBC           0 : #[derive(Clone)]
     148                 : struct ConnectionLabels {
     149                 :     /// Availability zone of the connection origin.
     150                 :     client_az: String,
     151                 :     /// Availability zone of the current safekeeper.
     152                 :     sk_az: String,
     153                 :     /// Client application name.
     154                 :     app_name: String,
     155                 : }
     156                 : 
     157                 : impl ConnectionLabels {
     158 CBC        3123 :     fn new() -> Self {
     159            3123 :         Self {
     160            3123 :             client_az: LABEL_UNKNOWN.to_string(),
     161            3123 :             sk_az: LABEL_UNKNOWN.to_string(),
     162            3123 :             app_name: LABEL_UNKNOWN.to_string(),
     163            3123 :         }
     164            3123 :     }
     165                 : 
     166            6971 :     fn build_metrics(
     167            6971 :         &self,
     168            6971 :     ) -> (
     169            6971 :         GenericCounter<metrics::core::AtomicU64>,
     170            6971 :         GenericCounter<metrics::core::AtomicU64>,
     171            6971 :     ) {
     172            6971 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     173            6971 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     174              10 :             (client_az, sk_az) => {
     175              10 :                 if client_az == sk_az {
     176 UBC           0 :                     "true"
     177                 :                 } else {
     178 CBC          10 :                     "false"
     179                 :                 }
     180                 :             }
     181                 :         };
     182                 : 
     183            6971 :         let read = PG_IO_BYTES.with_label_values(&[
     184            6971 :             &self.client_az,
     185            6971 :             &self.sk_az,
     186            6971 :             &self.app_name,
     187            6971 :             "read",
     188            6971 :             same_az,
     189            6971 :         ]);
     190            6971 :         let write = PG_IO_BYTES.with_label_values(&[
     191            6971 :             &self.client_az,
     192            6971 :             &self.sk_az,
     193            6971 :             &self.app_name,
     194            6971 :             "write",
     195            6971 :             same_az,
     196            6971 :         ]);
     197            6971 :         (read, write)
     198            6971 :     }
     199                 : }
     200                 : 
     201                 : struct TrafficMetricsState {
     202                 :     /// Labels for traffic metrics.
     203                 :     labels: ConnectionLabels,
     204                 :     /// Total bytes read from this connection.
     205                 :     read: GenericCounter<metrics::core::AtomicU64>,
     206                 :     /// Total bytes written to this connection.
     207                 :     write: GenericCounter<metrics::core::AtomicU64>,
     208                 : }
     209                 : 
     210                 : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     211            3123 : #[derive(Clone)]
     212                 : pub struct TrafficMetrics {
     213                 :     state: Arc<RwLock<TrafficMetricsState>>,
     214                 : }
     215                 : 
     216                 : impl Default for TrafficMetrics {
     217 UBC           0 :     fn default() -> Self {
     218               0 :         Self::new()
     219               0 :     }
     220                 : }
     221                 : 
     222                 : impl TrafficMetrics {
     223 CBC        3123 :     pub fn new() -> Self {
     224            3123 :         let labels = ConnectionLabels::new();
     225            3123 :         let (read, write) = labels.build_metrics();
     226            3123 :         let state = TrafficMetricsState {
     227            3123 :             labels,
     228            3123 :             read,
     229            3123 :             write,
     230            3123 :         };
     231            3123 :         Self {
     232            3123 :             state: Arc::new(RwLock::new(state)),
     233            3123 :         }
     234            3123 :     }
     235                 : 
     236               5 :     pub fn set_client_az(&self, value: &str) {
     237               5 :         let mut state = self.state.write().unwrap();
     238               5 :         state.labels.client_az = value.to_string();
     239               5 :         (state.read, state.write) = state.labels.build_metrics();
     240               5 :     }
     241                 : 
     242            3123 :     pub fn set_sk_az(&self, value: &str) {
     243            3123 :         let mut state = self.state.write().unwrap();
     244            3123 :         state.labels.sk_az = value.to_string();
     245            3123 :         (state.read, state.write) = state.labels.build_metrics();
     246            3123 :     }
     247                 : 
     248             720 :     pub fn set_app_name(&self, value: &str) {
     249             720 :         let mut state = self.state.write().unwrap();
     250             720 :         state.labels.app_name = value.to_string();
     251             720 :         (state.read, state.write) = state.labels.build_metrics();
     252             720 :     }
     253                 : 
     254         2177218 :     pub fn observe_read(&self, cnt: usize) {
     255         2177218 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     256         2177218 :     }
     257                 : 
     258         1883400 :     pub fn observe_write(&self, cnt: usize) {
     259         1883400 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     260         1883400 :     }
     261                 : }
     262                 : 
     263                 : /// Metrics for WalStorage in a single timeline.
     264             631 : #[derive(Clone, Default)]
     265                 : pub struct WalStorageMetrics {
     266                 :     /// How much bytes were written in total.
     267                 :     write_wal_bytes: u64,
     268                 :     /// How much time spent writing WAL to disk, waiting for write(2).
     269                 :     write_wal_seconds: f64,
     270                 :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     271                 :     flush_wal_seconds: f64,
     272                 : }
     273                 : 
     274                 : impl WalStorageMetrics {
     275         1216618 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     276         1216618 :         self.write_wal_bytes += bytes as u64;
     277         1216618 :         WRITE_WAL_BYTES.observe(bytes as f64);
     278         1216618 :     }
     279                 : 
     280         1216618 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     281         1216618 :         self.write_wal_seconds += seconds;
     282         1216618 :         WRITE_WAL_SECONDS.observe(seconds);
     283         1216618 :     }
     284                 : 
     285 UBC           0 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     286               0 :         self.flush_wal_seconds += seconds;
     287               0 :         FLUSH_WAL_SECONDS.observe(seconds);
     288               0 :     }
     289                 : }
     290                 : 
     291                 : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     292 CBC     1216619 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     293         1216619 :     closure: impl Future<Output = Result<(), E>>,
     294         1216619 : ) -> Result<f64> {
     295         1216619 :     let start = std::time::Instant::now();
     296         2485345 :     closure.await.map_err(|e| e.into())?;
     297         1216618 :     Ok(start.elapsed().as_secs_f64())
     298         1216618 : }
     299                 : 
     300                 : /// Metrics for a single timeline.
     301 UBC           0 : #[derive(Clone)]
     302                 : pub struct FullTimelineInfo {
     303                 :     pub ttid: TenantTimelineId,
     304                 :     pub ps_feedback: PageserverFeedback,
     305                 :     pub wal_backup_active: bool,
     306                 :     pub timeline_is_active: bool,
     307                 :     pub num_computes: u32,
     308                 :     pub last_removed_segno: XLogSegNo,
     309                 : 
     310                 :     pub epoch_start_lsn: Lsn,
     311                 :     pub mem_state: SafekeeperMemState,
     312                 :     pub persisted_state: SafeKeeperState,
     313                 : 
     314                 :     pub flush_lsn: Lsn,
     315                 :     pub remote_consistent_lsn: Lsn,
     316                 : 
     317                 :     pub wal_storage: WalStorageMetrics,
     318                 : }
     319                 : 
     320                 : /// Collects metrics for all active timelines.
     321                 : pub struct TimelineCollector {
     322                 :     descs: Vec<Desc>,
     323                 :     commit_lsn: GenericGaugeVec<AtomicU64>,
     324                 :     backup_lsn: GenericGaugeVec<AtomicU64>,
     325                 :     flush_lsn: GenericGaugeVec<AtomicU64>,
     326                 :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     327                 :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     328                 :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     329                 :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     330                 :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     331                 :     timeline_active: GenericGaugeVec<AtomicU64>,
     332                 :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     333                 :     connected_computes: IntGaugeVec,
     334                 :     disk_usage: GenericGaugeVec<AtomicU64>,
     335                 :     acceptor_term: GenericGaugeVec<AtomicU64>,
     336                 :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     337                 :     written_wal_seconds: GaugeVec,
     338                 :     flushed_wal_seconds: GaugeVec,
     339                 :     collect_timeline_metrics: Gauge,
     340                 :     timelines_count: IntGauge,
     341                 : }
     342                 : 
     343                 : impl Default for TimelineCollector {
     344               0 :     fn default() -> Self {
     345               0 :         Self::new()
     346               0 :     }
     347                 : }
     348                 : 
     349                 : impl TimelineCollector {
     350 CBC         485 :     pub fn new() -> TimelineCollector {
     351             485 :         let mut descs = Vec::new();
     352             485 : 
     353             485 :         let commit_lsn = GenericGaugeVec::new(
     354             485 :             Opts::new(
     355             485 :                 "safekeeper_commit_lsn",
     356             485 :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     357             485 :             ),
     358             485 :             &["tenant_id", "timeline_id"],
     359             485 :         )
     360             485 :         .unwrap();
     361             485 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     362             485 : 
     363             485 :         let backup_lsn = GenericGaugeVec::new(
     364             485 :             Opts::new(
     365             485 :                 "safekeeper_backup_lsn",
     366             485 :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     367             485 :             ),
     368             485 :             &["tenant_id", "timeline_id"],
     369             485 :         )
     370             485 :         .unwrap();
     371             485 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     372             485 : 
     373             485 :         let flush_lsn = GenericGaugeVec::new(
     374             485 :             Opts::new(
     375             485 :                 "safekeeper_flush_lsn",
     376             485 :                 "Current flush_lsn, grouped by timeline",
     377             485 :             ),
     378             485 :             &["tenant_id", "timeline_id"],
     379             485 :         )
     380             485 :         .unwrap();
     381             485 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     382             485 : 
     383             485 :         let epoch_start_lsn = GenericGaugeVec::new(
     384             485 :             Opts::new(
     385             485 :                 "safekeeper_epoch_start_lsn",
     386             485 :                 "Point since which compute generates new WAL in the current consensus term",
     387             485 :             ),
     388             485 :             &["tenant_id", "timeline_id"],
     389             485 :         )
     390             485 :         .unwrap();
     391             485 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     392             485 : 
     393             485 :         let peer_horizon_lsn = GenericGaugeVec::new(
     394             485 :             Opts::new(
     395             485 :                 "safekeeper_peer_horizon_lsn",
     396             485 :                 "LSN of the most lagging safekeeper",
     397             485 :             ),
     398             485 :             &["tenant_id", "timeline_id"],
     399             485 :         )
     400             485 :         .unwrap();
     401             485 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     402             485 : 
     403             485 :         let remote_consistent_lsn = GenericGaugeVec::new(
     404             485 :             Opts::new(
     405             485 :                 "safekeeper_remote_consistent_lsn",
     406             485 :                 "LSN which is persisted to the remote storage in pageserver",
     407             485 :             ),
     408             485 :             &["tenant_id", "timeline_id"],
     409             485 :         )
     410             485 :         .unwrap();
     411             485 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     412             485 : 
     413             485 :         let ps_last_received_lsn = GenericGaugeVec::new(
     414             485 :             Opts::new(
     415             485 :                 "safekeeper_ps_last_received_lsn",
     416             485 :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     417             485 :             ),
     418             485 :             &["tenant_id", "timeline_id"],
     419             485 :         )
     420             485 :         .unwrap();
     421             485 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     422             485 : 
     423             485 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     424             485 :             Opts::new(
     425             485 :                 "safekeeper_feedback_last_time_seconds",
     426             485 :                 "Timestamp of the last feedback from the pageserver",
     427             485 :             ),
     428             485 :             &["tenant_id", "timeline_id"],
     429             485 :         )
     430             485 :         .unwrap();
     431             485 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     432             485 : 
     433             485 :         let timeline_active = GenericGaugeVec::new(
     434             485 :             Opts::new(
     435             485 :                 "safekeeper_timeline_active",
     436             485 :                 "Reports 1 for active timelines, 0 for inactive",
     437             485 :             ),
     438             485 :             &["tenant_id", "timeline_id"],
     439             485 :         )
     440             485 :         .unwrap();
     441             485 :         descs.extend(timeline_active.desc().into_iter().cloned());
     442             485 : 
     443             485 :         let wal_backup_active = GenericGaugeVec::new(
     444             485 :             Opts::new(
     445             485 :                 "safekeeper_wal_backup_active",
     446             485 :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     447             485 :             ),
     448             485 :             &["tenant_id", "timeline_id"],
     449             485 :         )
     450             485 :         .unwrap();
     451             485 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     452             485 : 
     453             485 :         let connected_computes = IntGaugeVec::new(
     454             485 :             Opts::new(
     455             485 :                 "safekeeper_connected_computes",
     456             485 :                 "Number of active compute connections",
     457             485 :             ),
     458             485 :             &["tenant_id", "timeline_id"],
     459             485 :         )
     460             485 :         .unwrap();
     461             485 :         descs.extend(connected_computes.desc().into_iter().cloned());
     462             485 : 
     463             485 :         let disk_usage = GenericGaugeVec::new(
     464             485 :             Opts::new(
     465             485 :                 "safekeeper_disk_usage_bytes",
     466             485 :                 "Estimated disk space used to store WAL segments",
     467             485 :             ),
     468             485 :             &["tenant_id", "timeline_id"],
     469             485 :         )
     470             485 :         .unwrap();
     471             485 :         descs.extend(disk_usage.desc().into_iter().cloned());
     472             485 : 
     473             485 :         let acceptor_term = GenericGaugeVec::new(
     474             485 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     475             485 :             &["tenant_id", "timeline_id"],
     476             485 :         )
     477             485 :         .unwrap();
     478             485 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     479             485 : 
     480             485 :         let written_wal_bytes = GenericGaugeVec::new(
     481             485 :             Opts::new(
     482             485 :                 "safekeeper_written_wal_bytes_total",
     483             485 :                 "Number of WAL bytes written to disk, grouped by timeline",
     484             485 :             ),
     485             485 :             &["tenant_id", "timeline_id"],
     486             485 :         )
     487             485 :         .unwrap();
     488             485 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     489             485 : 
     490             485 :         let written_wal_seconds = GaugeVec::new(
     491             485 :             Opts::new(
     492             485 :                 "safekeeper_written_wal_seconds_total",
     493             485 :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     494             485 :             ),
     495             485 :             &["tenant_id", "timeline_id"],
     496             485 :         )
     497             485 :         .unwrap();
     498             485 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     499             485 : 
     500             485 :         let flushed_wal_seconds = GaugeVec::new(
     501             485 :             Opts::new(
     502             485 :                 "safekeeper_flushed_wal_seconds_total",
     503             485 :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     504             485 :             ),
     505             485 :             &["tenant_id", "timeline_id"],
     506             485 :         )
     507             485 :         .unwrap();
     508             485 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     509             485 : 
     510             485 :         let collect_timeline_metrics = Gauge::new(
     511             485 :             "safekeeper_collect_timeline_metrics_seconds",
     512             485 :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     513             485 :         )
     514             485 :         .unwrap();
     515             485 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     516             485 : 
     517             485 :         let timelines_count = IntGauge::new(
     518             485 :             "safekeeper_timelines",
     519             485 :             "Total number of timelines loaded in-memory",
     520             485 :         )
     521             485 :         .unwrap();
     522             485 :         descs.extend(timelines_count.desc().into_iter().cloned());
     523             485 : 
     524             485 :         TimelineCollector {
     525             485 :             descs,
     526             485 :             commit_lsn,
     527             485 :             backup_lsn,
     528             485 :             flush_lsn,
     529             485 :             epoch_start_lsn,
     530             485 :             peer_horizon_lsn,
     531             485 :             remote_consistent_lsn,
     532             485 :             ps_last_received_lsn,
     533             485 :             feedback_last_time_seconds,
     534             485 :             timeline_active,
     535             485 :             wal_backup_active,
     536             485 :             connected_computes,
     537             485 :             disk_usage,
     538             485 :             acceptor_term,
     539             485 :             written_wal_bytes,
     540             485 :             written_wal_seconds,
     541             485 :             flushed_wal_seconds,
     542             485 :             collect_timeline_metrics,
     543             485 :             timelines_count,
     544             485 :         }
     545             485 :     }
     546                 : }
     547                 : 
     548                 : impl Collector for TimelineCollector {
     549             485 :     fn desc(&self) -> Vec<&Desc> {
     550             485 :         self.descs.iter().collect()
     551             485 :     }
     552                 : 
     553              19 :     fn collect(&self) -> Vec<MetricFamily> {
     554              19 :         let start_collecting = Instant::now();
     555              19 : 
     556              19 :         // reset all metrics to clean up inactive timelines
     557              19 :         self.commit_lsn.reset();
     558              19 :         self.backup_lsn.reset();
     559              19 :         self.flush_lsn.reset();
     560              19 :         self.epoch_start_lsn.reset();
     561              19 :         self.peer_horizon_lsn.reset();
     562              19 :         self.remote_consistent_lsn.reset();
     563              19 :         self.ps_last_received_lsn.reset();
     564              19 :         self.feedback_last_time_seconds.reset();
     565              19 :         self.timeline_active.reset();
     566              19 :         self.wal_backup_active.reset();
     567              19 :         self.connected_computes.reset();
     568              19 :         self.disk_usage.reset();
     569              19 :         self.acceptor_term.reset();
     570              19 :         self.written_wal_bytes.reset();
     571              19 :         self.written_wal_seconds.reset();
     572              19 :         self.flushed_wal_seconds.reset();
     573              19 : 
     574              19 :         let timelines = GlobalTimelines::get_all();
     575              19 :         let timelines_count = timelines.len();
     576              19 : 
     577              19 :         // Prometheus Collector is sync, and data is stored under async lock. To
     578              19 :         // bridge the gap with a crutch, collect data in spawned thread with
     579              19 :         // local tokio runtime.
     580              19 :         let infos = std::thread::spawn(|| {
     581              19 :             let rt = tokio::runtime::Builder::new_current_thread()
     582              19 :                 .build()
     583              19 :                 .expect("failed to create rt");
     584              19 :             rt.block_on(collect_timeline_metrics())
     585              19 :         })
     586              19 :         .join()
     587              19 :         .expect("collect_timeline_metrics thread panicked");
     588                 : 
     589              70 :         for tli in &infos {
     590              51 :             let tenant_id = tli.ttid.tenant_id.to_string();
     591              51 :             let timeline_id = tli.ttid.timeline_id.to_string();
     592              51 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     593              51 : 
     594              51 :             self.commit_lsn
     595              51 :                 .with_label_values(labels)
     596              51 :                 .set(tli.mem_state.commit_lsn.into());
     597              51 :             self.backup_lsn
     598              51 :                 .with_label_values(labels)
     599              51 :                 .set(tli.mem_state.backup_lsn.into());
     600              51 :             self.flush_lsn
     601              51 :                 .with_label_values(labels)
     602              51 :                 .set(tli.flush_lsn.into());
     603              51 :             self.epoch_start_lsn
     604              51 :                 .with_label_values(labels)
     605              51 :                 .set(tli.epoch_start_lsn.into());
     606              51 :             self.peer_horizon_lsn
     607              51 :                 .with_label_values(labels)
     608              51 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     609              51 :             self.remote_consistent_lsn
     610              51 :                 .with_label_values(labels)
     611              51 :                 .set(tli.remote_consistent_lsn.into());
     612              51 :             self.timeline_active
     613              51 :                 .with_label_values(labels)
     614              51 :                 .set(tli.timeline_is_active as u64);
     615              51 :             self.wal_backup_active
     616              51 :                 .with_label_values(labels)
     617              51 :                 .set(tli.wal_backup_active as u64);
     618              51 :             self.connected_computes
     619              51 :                 .with_label_values(labels)
     620              51 :                 .set(tli.num_computes as i64);
     621              51 :             self.acceptor_term
     622              51 :                 .with_label_values(labels)
     623              51 :                 .set(tli.persisted_state.acceptor_state.term);
     624              51 :             self.written_wal_bytes
     625              51 :                 .with_label_values(labels)
     626              51 :                 .set(tli.wal_storage.write_wal_bytes);
     627              51 :             self.written_wal_seconds
     628              51 :                 .with_label_values(labels)
     629              51 :                 .set(tli.wal_storage.write_wal_seconds);
     630              51 :             self.flushed_wal_seconds
     631              51 :                 .with_label_values(labels)
     632              51 :                 .set(tli.wal_storage.flush_wal_seconds);
     633              51 : 
     634              51 :             self.ps_last_received_lsn
     635              51 :                 .with_label_values(labels)
     636              51 :                 .set(tli.ps_feedback.last_received_lsn.0);
     637              51 :             if let Ok(unix_time) = tli
     638              51 :                 .ps_feedback
     639              51 :                 .replytime
     640              51 :                 .duration_since(SystemTime::UNIX_EPOCH)
     641              51 :             {
     642              51 :                 self.feedback_last_time_seconds
     643              51 :                     .with_label_values(labels)
     644              51 :                     .set(unix_time.as_secs());
     645              51 :             }
     646                 : 
     647              51 :             if tli.last_removed_segno != 0 {
     648 UBC           0 :                 let segno_count = tli
     649               0 :                     .flush_lsn
     650               0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     651               0 :                     - tli.last_removed_segno;
     652               0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     653               0 :                 self.disk_usage
     654               0 :                     .with_label_values(labels)
     655               0 :                     .set(disk_usage_bytes);
     656 CBC          51 :             }
     657                 :         }
     658                 : 
     659                 :         // collect MetricFamilys.
     660              19 :         let mut mfs = Vec::new();
     661              19 :         mfs.extend(self.commit_lsn.collect());
     662              19 :         mfs.extend(self.backup_lsn.collect());
     663              19 :         mfs.extend(self.flush_lsn.collect());
     664              19 :         mfs.extend(self.epoch_start_lsn.collect());
     665              19 :         mfs.extend(self.peer_horizon_lsn.collect());
     666              19 :         mfs.extend(self.remote_consistent_lsn.collect());
     667              19 :         mfs.extend(self.ps_last_received_lsn.collect());
     668              19 :         mfs.extend(self.feedback_last_time_seconds.collect());
     669              19 :         mfs.extend(self.timeline_active.collect());
     670              19 :         mfs.extend(self.wal_backup_active.collect());
     671              19 :         mfs.extend(self.connected_computes.collect());
     672              19 :         mfs.extend(self.disk_usage.collect());
     673              19 :         mfs.extend(self.acceptor_term.collect());
     674              19 :         mfs.extend(self.written_wal_bytes.collect());
     675              19 :         mfs.extend(self.written_wal_seconds.collect());
     676              19 :         mfs.extend(self.flushed_wal_seconds.collect());
     677              19 : 
     678              19 :         // report time it took to collect all info
     679              19 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     680              19 :         self.collect_timeline_metrics.set(elapsed);
     681              19 :         mfs.extend(self.collect_timeline_metrics.collect());
     682              19 : 
     683              19 :         // report total number of timelines
     684              19 :         self.timelines_count.set(timelines_count as i64);
     685              19 :         mfs.extend(self.timelines_count.collect());
     686              19 : 
     687              19 :         mfs
     688              19 :     }
     689                 : }
     690                 : 
     691              19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
     692              19 :     let mut res = vec![];
     693              19 :     let timelines = GlobalTimelines::get_all();
     694                 : 
     695              70 :     for tli in timelines {
     696              51 :         if let Some(info) = tli.info_for_metrics().await {
     697              51 :             res.push(info);
     698              51 :         }
     699                 :     }
     700              19 :     res
     701              19 : }
        

Generated by: LCOV version 2.1-beta