LCOV - code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 93.6 % 561 525
Test Date: 2024-02-07 07:37:29 Functions: 78.9 % 57 45

            Line data    Source code
       1              : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2              : 
       3              : use std::{
       4              :     sync::{Arc, RwLock},
       5              :     time::{Instant, SystemTime},
       6              : };
       7              : 
       8              : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
       9              : use anyhow::Result;
      10              : use futures::Future;
      11              : use metrics::{
      12              :     core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
      13              :     proto::MetricFamily,
      14              :     register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
      15              :     IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
      16              : };
      17              : use once_cell::sync::Lazy;
      18              : 
      19              : use postgres_ffi::XLogSegNo;
      20              : use utils::pageserver_feedback::PageserverFeedback;
      21              : use utils::{id::TenantTimelineId, lsn::Lsn};
      22              : 
      23              : use crate::{
      24              :     state::{TimelineMemState, TimelinePersistentState},
      25              :     GlobalTimelines,
      26              : };
      27              : 
      28              : // Global metrics across all timelines.
      29          433 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      30          433 :     register_histogram!(
      31          433 :         "safekeeper_write_wal_bytes",
      32          433 :         "Bytes written to WAL in a single request",
      33          433 :         vec![
      34          433 :             1.0,
      35          433 :             10.0,
      36          433 :             100.0,
      37          433 :             1024.0,
      38          433 :             8192.0,
      39          433 :             128.0 * 1024.0,
      40          433 :             1024.0 * 1024.0,
      41          433 :             10.0 * 1024.0 * 1024.0
      42          433 :         ]
      43          433 :     )
      44          433 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      45          433 : });
      46          433 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      47          433 :     register_histogram!(
      48          433 :         "safekeeper_write_wal_seconds",
      49          433 :         "Seconds spent writing and syncing WAL to a disk in a single request",
      50          433 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      51          433 :     )
      52          433 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      53          433 : });
      54            0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      55            0 :     register_histogram!(
      56            0 :         "safekeeper_flush_wal_seconds",
      57            0 :         "Seconds spent syncing WAL to a disk",
      58            0 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      59            0 :     )
      60            0 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      61            0 : });
      62          441 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      63          441 :     register_histogram!(
      64          441 :         "safekeeper_persist_control_file_seconds",
      65          441 :         "Seconds to persist and sync control file",
      66          441 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      67          441 :     )
      68          441 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
      69          441 : });
      70          442 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
      71          442 :     register_int_counter_vec!(
      72          442 :         "safekeeper_pg_io_bytes_total",
      73          442 :         "Bytes read from or written to any PostgreSQL connection",
      74          442 :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
      75          442 :     )
      76          442 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
      77          442 : });
      78          429 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
      79          429 :     register_int_counter!(
      80          429 :         "safekeeper_broker_pushed_updates_total",
      81          429 :         "Number of timeline updates pushed to the broker"
      82          429 :     )
      83          429 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
      84          429 : });
      85          510 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
      86          510 :     register_int_counter_vec!(
      87          510 :         "safekeeper_broker_pulled_updates_total",
      88          510 :         "Number of timeline updates pulled and processed from the broker",
      89          510 :         &["result"]
      90          510 :     )
      91          510 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
      92          510 : });
      93          442 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
      94          884 :     register_int_counter_pair_vec!(
      95          884 :         "safekeeper_pg_queries_received_total",
      96          884 :         "Number of queries received through pg protocol",
      97          884 :         "safekeeper_pg_queries_finished_total",
      98          884 :         "Number of queries finished through pg protocol",
      99          884 :         &["query"]
     100          884 :     )
     101          442 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     102          442 : });
     103            7 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     104            7 :     register_int_counter!(
     105            7 :         "safekeeper_removed_wal_segments_total",
     106            7 :         "Number of WAL segments removed from the disk"
     107            7 :     )
     108            7 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     109            7 : });
     110            6 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     111            6 :     register_int_counter!(
     112            6 :         "safekeeper_backed_up_segments_total",
     113            6 :         "Number of WAL segments backed up to the S3"
     114            6 :     )
     115            6 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     116            6 : });
     117            0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     118            0 :     register_int_counter!(
     119            0 :         "safekeeper_backup_errors_total",
     120            0 :         "Number of errors during backup"
     121            0 :     )
     122            0 :     .expect("Failed to register safekeeper_backup_errors_total counter")
     123            0 : });
     124          510 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     125          510 :     register_histogram!(
     126          510 :         "safekeeper_broker_push_update_seconds",
     127          510 :         "Seconds to push all timeline updates to the broker",
     128          510 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
     129          510 :     )
     130          510 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     131          510 : });
     132              : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     133              :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     134              : ];
     135          510 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     136          510 :     register_histogram!(
     137          510 :         "safekeeper_broker_iteration_timelines",
     138          510 :         "Count of timelines pushed to the broker in a single iteration",
     139          510 :         TIMELINES_COUNT_BUCKETS.to_vec()
     140          510 :     )
     141          510 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     142          510 : });
     143              : 
     144              : pub const LABEL_UNKNOWN: &str = "unknown";
     145              : 
     146              : /// Labels for traffic metrics.
     147            0 : #[derive(Clone)]
     148              : struct ConnectionLabels {
     149              :     /// Availability zone of the connection origin.
     150              :     client_az: String,
     151              :     /// Availability zone of the current safekeeper.
     152              :     sk_az: String,
     153              :     /// Client application name.
     154              :     app_name: String,
     155              : }
     156              : 
     157              : impl ConnectionLabels {
     158         3292 :     fn new() -> Self {
     159         3292 :         Self {
     160         3292 :             client_az: LABEL_UNKNOWN.to_string(),
     161         3292 :             sk_az: LABEL_UNKNOWN.to_string(),
     162         3292 :             app_name: LABEL_UNKNOWN.to_string(),
     163         3292 :         }
     164         3292 :     }
     165              : 
     166         7338 :     fn build_metrics(
     167         7338 :         &self,
     168         7338 :     ) -> (
     169         7338 :         GenericCounter<metrics::core::AtomicU64>,
     170         7338 :         GenericCounter<metrics::core::AtomicU64>,
     171         7338 :     ) {
     172         7338 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     173         7338 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     174            4 :             (client_az, sk_az) => {
     175            4 :                 if client_az == sk_az {
     176            0 :                     "true"
     177              :                 } else {
     178            4 :                     "false"
     179              :                 }
     180              :             }
     181              :         };
     182              : 
     183         7338 :         let read = PG_IO_BYTES.with_label_values(&[
     184         7338 :             &self.client_az,
     185         7338 :             &self.sk_az,
     186         7338 :             &self.app_name,
     187         7338 :             "read",
     188         7338 :             same_az,
     189         7338 :         ]);
     190         7338 :         let write = PG_IO_BYTES.with_label_values(&[
     191         7338 :             &self.client_az,
     192         7338 :             &self.sk_az,
     193         7338 :             &self.app_name,
     194         7338 :             "write",
     195         7338 :             same_az,
     196         7338 :         ]);
     197         7338 :         (read, write)
     198         7338 :     }
     199              : }
     200              : 
     201              : struct TrafficMetricsState {
     202              :     /// Labels for traffic metrics.
     203              :     labels: ConnectionLabels,
     204              :     /// Total bytes read from this connection.
     205              :     read: GenericCounter<metrics::core::AtomicU64>,
     206              :     /// Total bytes written to this connection.
     207              :     write: GenericCounter<metrics::core::AtomicU64>,
     208              : }
     209              : 
     210              : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     211         3292 : #[derive(Clone)]
     212              : pub struct TrafficMetrics {
     213              :     state: Arc<RwLock<TrafficMetricsState>>,
     214              : }
     215              : 
     216              : impl Default for TrafficMetrics {
     217            0 :     fn default() -> Self {
     218            0 :         Self::new()
     219            0 :     }
     220              : }
     221              : 
     222              : impl TrafficMetrics {
     223         3292 :     pub fn new() -> Self {
     224         3292 :         let labels = ConnectionLabels::new();
     225         3292 :         let (read, write) = labels.build_metrics();
     226         3292 :         let state = TrafficMetricsState {
     227         3292 :             labels,
     228         3292 :             read,
     229         3292 :             write,
     230         3292 :         };
     231         3292 :         Self {
     232         3292 :             state: Arc::new(RwLock::new(state)),
     233         3292 :         }
     234         3292 :     }
     235              : 
     236            2 :     pub fn set_client_az(&self, value: &str) {
     237            2 :         let mut state = self.state.write().unwrap();
     238            2 :         state.labels.client_az = value.to_string();
     239            2 :         (state.read, state.write) = state.labels.build_metrics();
     240            2 :     }
     241              : 
     242         3292 :     pub fn set_sk_az(&self, value: &str) {
     243         3292 :         let mut state = self.state.write().unwrap();
     244         3292 :         state.labels.sk_az = value.to_string();
     245         3292 :         (state.read, state.write) = state.labels.build_metrics();
     246         3292 :     }
     247              : 
     248          752 :     pub fn set_app_name(&self, value: &str) {
     249          752 :         let mut state = self.state.write().unwrap();
     250          752 :         state.labels.app_name = value.to_string();
     251          752 :         (state.read, state.write) = state.labels.build_metrics();
     252          752 :     }
     253              : 
     254      2928659 :     pub fn observe_read(&self, cnt: usize) {
     255      2928659 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     256      2928659 :     }
     257              : 
     258      2483186 :     pub fn observe_write(&self, cnt: usize) {
     259      2483186 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     260      2483186 :     }
     261              : }
     262              : 
     263              : /// Metrics for WalStorage in a single timeline.
     264          665 : #[derive(Clone, Default)]
     265              : pub struct WalStorageMetrics {
     266              :     /// How much bytes were written in total.
     267              :     write_wal_bytes: u64,
     268              :     /// How much time spent writing WAL to disk, waiting for write(2).
     269              :     write_wal_seconds: f64,
     270              :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     271              :     flush_wal_seconds: f64,
     272              : }
     273              : 
     274              : impl WalStorageMetrics {
     275      1668332 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     276      1668332 :         self.write_wal_bytes += bytes as u64;
     277      1668332 :         WRITE_WAL_BYTES.observe(bytes as f64);
     278      1668332 :     }
     279              : 
     280      1668332 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     281      1668332 :         self.write_wal_seconds += seconds;
     282      1668332 :         WRITE_WAL_SECONDS.observe(seconds);
     283      1668332 :     }
     284              : 
     285            0 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     286            0 :         self.flush_wal_seconds += seconds;
     287            0 :         FLUSH_WAL_SECONDS.observe(seconds);
     288            0 :     }
     289              : }
     290              : 
     291              : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     292      1668334 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     293      1668334 :     closure: impl Future<Output = Result<(), E>>,
     294      1668334 : ) -> Result<f64> {
     295      1668334 :     let start = std::time::Instant::now();
     296      3129169 :     closure.await.map_err(|e| e.into())?;
     297      1668332 :     Ok(start.elapsed().as_secs_f64())
     298      1668334 : }
     299              : 
     300              : /// Metrics for a single timeline.
     301            0 : #[derive(Clone)]
     302              : pub struct FullTimelineInfo {
     303              :     pub ttid: TenantTimelineId,
     304              :     pub ps_feedback: PageserverFeedback,
     305              :     pub wal_backup_active: bool,
     306              :     pub timeline_is_active: bool,
     307              :     pub num_computes: u32,
     308              :     pub last_removed_segno: XLogSegNo,
     309              : 
     310              :     pub epoch_start_lsn: Lsn,
     311              :     pub mem_state: TimelineMemState,
     312              :     pub persisted_state: TimelinePersistentState,
     313              : 
     314              :     pub flush_lsn: Lsn,
     315              : 
     316              :     pub wal_storage: WalStorageMetrics,
     317              : }
     318              : 
     319              : /// Collects metrics for all active timelines.
     320              : pub struct TimelineCollector {
     321              :     descs: Vec<Desc>,
     322              :     commit_lsn: GenericGaugeVec<AtomicU64>,
     323              :     backup_lsn: GenericGaugeVec<AtomicU64>,
     324              :     flush_lsn: GenericGaugeVec<AtomicU64>,
     325              :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     326              :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     327              :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     328              :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     329              :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     330              :     timeline_active: GenericGaugeVec<AtomicU64>,
     331              :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     332              :     connected_computes: IntGaugeVec,
     333              :     disk_usage: GenericGaugeVec<AtomicU64>,
     334              :     acceptor_term: GenericGaugeVec<AtomicU64>,
     335              :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     336              :     written_wal_seconds: GaugeVec,
     337              :     flushed_wal_seconds: GaugeVec,
     338              :     collect_timeline_metrics: Gauge,
     339              :     timelines_count: IntGauge,
     340              :     active_timelines_count: IntGauge,
     341              : }
     342              : 
     343              : impl Default for TimelineCollector {
     344            0 :     fn default() -> Self {
     345            0 :         Self::new()
     346            0 :     }
     347              : }
     348              : 
     349              : impl TimelineCollector {
     350          510 :     pub fn new() -> TimelineCollector {
     351          510 :         let mut descs = Vec::new();
     352          510 : 
     353          510 :         let commit_lsn = GenericGaugeVec::new(
     354          510 :             Opts::new(
     355          510 :                 "safekeeper_commit_lsn",
     356          510 :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     357          510 :             ),
     358          510 :             &["tenant_id", "timeline_id"],
     359          510 :         )
     360          510 :         .unwrap();
     361          510 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     362          510 : 
     363          510 :         let backup_lsn = GenericGaugeVec::new(
     364          510 :             Opts::new(
     365          510 :                 "safekeeper_backup_lsn",
     366          510 :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     367          510 :             ),
     368          510 :             &["tenant_id", "timeline_id"],
     369          510 :         )
     370          510 :         .unwrap();
     371          510 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     372          510 : 
     373          510 :         let flush_lsn = GenericGaugeVec::new(
     374          510 :             Opts::new(
     375          510 :                 "safekeeper_flush_lsn",
     376          510 :                 "Current flush_lsn, grouped by timeline",
     377          510 :             ),
     378          510 :             &["tenant_id", "timeline_id"],
     379          510 :         )
     380          510 :         .unwrap();
     381          510 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     382          510 : 
     383          510 :         let epoch_start_lsn = GenericGaugeVec::new(
     384          510 :             Opts::new(
     385          510 :                 "safekeeper_epoch_start_lsn",
     386          510 :                 "Point since which compute generates new WAL in the current consensus term",
     387          510 :             ),
     388          510 :             &["tenant_id", "timeline_id"],
     389          510 :         )
     390          510 :         .unwrap();
     391          510 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     392          510 : 
     393          510 :         let peer_horizon_lsn = GenericGaugeVec::new(
     394          510 :             Opts::new(
     395          510 :                 "safekeeper_peer_horizon_lsn",
     396          510 :                 "LSN of the most lagging safekeeper",
     397          510 :             ),
     398          510 :             &["tenant_id", "timeline_id"],
     399          510 :         )
     400          510 :         .unwrap();
     401          510 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     402          510 : 
     403          510 :         let remote_consistent_lsn = GenericGaugeVec::new(
     404          510 :             Opts::new(
     405          510 :                 "safekeeper_remote_consistent_lsn",
     406          510 :                 "LSN which is persisted to the remote storage in pageserver",
     407          510 :             ),
     408          510 :             &["tenant_id", "timeline_id"],
     409          510 :         )
     410          510 :         .unwrap();
     411          510 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     412          510 : 
     413          510 :         let ps_last_received_lsn = GenericGaugeVec::new(
     414          510 :             Opts::new(
     415          510 :                 "safekeeper_ps_last_received_lsn",
     416          510 :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     417          510 :             ),
     418          510 :             &["tenant_id", "timeline_id"],
     419          510 :         )
     420          510 :         .unwrap();
     421          510 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     422          510 : 
     423          510 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     424          510 :             Opts::new(
     425          510 :                 "safekeeper_feedback_last_time_seconds",
     426          510 :                 "Timestamp of the last feedback from the pageserver",
     427          510 :             ),
     428          510 :             &["tenant_id", "timeline_id"],
     429          510 :         )
     430          510 :         .unwrap();
     431          510 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     432          510 : 
     433          510 :         let timeline_active = GenericGaugeVec::new(
     434          510 :             Opts::new(
     435          510 :                 "safekeeper_timeline_active",
     436          510 :                 "Reports 1 for active timelines, 0 for inactive",
     437          510 :             ),
     438          510 :             &["tenant_id", "timeline_id"],
     439          510 :         )
     440          510 :         .unwrap();
     441          510 :         descs.extend(timeline_active.desc().into_iter().cloned());
     442          510 : 
     443          510 :         let wal_backup_active = GenericGaugeVec::new(
     444          510 :             Opts::new(
     445          510 :                 "safekeeper_wal_backup_active",
     446          510 :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     447          510 :             ),
     448          510 :             &["tenant_id", "timeline_id"],
     449          510 :         )
     450          510 :         .unwrap();
     451          510 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     452          510 : 
     453          510 :         let connected_computes = IntGaugeVec::new(
     454          510 :             Opts::new(
     455          510 :                 "safekeeper_connected_computes",
     456          510 :                 "Number of active compute connections",
     457          510 :             ),
     458          510 :             &["tenant_id", "timeline_id"],
     459          510 :         )
     460          510 :         .unwrap();
     461          510 :         descs.extend(connected_computes.desc().into_iter().cloned());
     462          510 : 
     463          510 :         let disk_usage = GenericGaugeVec::new(
     464          510 :             Opts::new(
     465          510 :                 "safekeeper_disk_usage_bytes",
     466          510 :                 "Estimated disk space used to store WAL segments",
     467          510 :             ),
     468          510 :             &["tenant_id", "timeline_id"],
     469          510 :         )
     470          510 :         .unwrap();
     471          510 :         descs.extend(disk_usage.desc().into_iter().cloned());
     472          510 : 
     473          510 :         let acceptor_term = GenericGaugeVec::new(
     474          510 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     475          510 :             &["tenant_id", "timeline_id"],
     476          510 :         )
     477          510 :         .unwrap();
     478          510 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     479          510 : 
     480          510 :         let written_wal_bytes = GenericGaugeVec::new(
     481          510 :             Opts::new(
     482          510 :                 "safekeeper_written_wal_bytes_total",
     483          510 :                 "Number of WAL bytes written to disk, grouped by timeline",
     484          510 :             ),
     485          510 :             &["tenant_id", "timeline_id"],
     486          510 :         )
     487          510 :         .unwrap();
     488          510 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     489          510 : 
     490          510 :         let written_wal_seconds = GaugeVec::new(
     491          510 :             Opts::new(
     492          510 :                 "safekeeper_written_wal_seconds_total",
     493          510 :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     494          510 :             ),
     495          510 :             &["tenant_id", "timeline_id"],
     496          510 :         )
     497          510 :         .unwrap();
     498          510 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     499          510 : 
     500          510 :         let flushed_wal_seconds = GaugeVec::new(
     501          510 :             Opts::new(
     502          510 :                 "safekeeper_flushed_wal_seconds_total",
     503          510 :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     504          510 :             ),
     505          510 :             &["tenant_id", "timeline_id"],
     506          510 :         )
     507          510 :         .unwrap();
     508          510 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     509          510 : 
     510          510 :         let collect_timeline_metrics = Gauge::new(
     511          510 :             "safekeeper_collect_timeline_metrics_seconds",
     512          510 :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     513          510 :         )
     514          510 :         .unwrap();
     515          510 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     516          510 : 
     517          510 :         let timelines_count = IntGauge::new(
     518          510 :             "safekeeper_timelines",
     519          510 :             "Total number of timelines loaded in-memory",
     520          510 :         )
     521          510 :         .unwrap();
     522          510 :         descs.extend(timelines_count.desc().into_iter().cloned());
     523          510 : 
     524          510 :         let active_timelines_count = IntGauge::new(
     525          510 :             "safekeeper_active_timelines",
     526          510 :             "Total number of active timelines",
     527          510 :         )
     528          510 :         .unwrap();
     529          510 :         descs.extend(active_timelines_count.desc().into_iter().cloned());
     530          510 : 
     531          510 :         TimelineCollector {
     532          510 :             descs,
     533          510 :             commit_lsn,
     534          510 :             backup_lsn,
     535          510 :             flush_lsn,
     536          510 :             epoch_start_lsn,
     537          510 :             peer_horizon_lsn,
     538          510 :             remote_consistent_lsn,
     539          510 :             ps_last_received_lsn,
     540          510 :             feedback_last_time_seconds,
     541          510 :             timeline_active,
     542          510 :             wal_backup_active,
     543          510 :             connected_computes,
     544          510 :             disk_usage,
     545          510 :             acceptor_term,
     546          510 :             written_wal_bytes,
     547          510 :             written_wal_seconds,
     548          510 :             flushed_wal_seconds,
     549          510 :             collect_timeline_metrics,
     550          510 :             timelines_count,
     551          510 :             active_timelines_count,
     552          510 :         }
     553          510 :     }
     554              : }
     555              : 
     556              : impl Collector for TimelineCollector {
     557          510 :     fn desc(&self) -> Vec<&Desc> {
     558          510 :         self.descs.iter().collect()
     559          510 :     }
     560              : 
     561           19 :     fn collect(&self) -> Vec<MetricFamily> {
     562           19 :         let start_collecting = Instant::now();
     563           19 : 
     564           19 :         // reset all metrics to clean up inactive timelines
     565           19 :         self.commit_lsn.reset();
     566           19 :         self.backup_lsn.reset();
     567           19 :         self.flush_lsn.reset();
     568           19 :         self.epoch_start_lsn.reset();
     569           19 :         self.peer_horizon_lsn.reset();
     570           19 :         self.remote_consistent_lsn.reset();
     571           19 :         self.ps_last_received_lsn.reset();
     572           19 :         self.feedback_last_time_seconds.reset();
     573           19 :         self.timeline_active.reset();
     574           19 :         self.wal_backup_active.reset();
     575           19 :         self.connected_computes.reset();
     576           19 :         self.disk_usage.reset();
     577           19 :         self.acceptor_term.reset();
     578           19 :         self.written_wal_bytes.reset();
     579           19 :         self.written_wal_seconds.reset();
     580           19 :         self.flushed_wal_seconds.reset();
     581           19 : 
     582           19 :         let timelines = GlobalTimelines::get_all();
     583           19 :         let timelines_count = timelines.len();
     584           19 :         let mut active_timelines_count = 0;
     585           19 : 
     586           19 :         // Prometheus Collector is sync, and data is stored under async lock. To
     587           19 :         // bridge the gap with a crutch, collect data in spawned thread with
     588           19 :         // local tokio runtime.
     589           19 :         let infos = std::thread::spawn(|| {
     590           19 :             let rt = tokio::runtime::Builder::new_current_thread()
     591           19 :                 .build()
     592           19 :                 .expect("failed to create rt");
     593           19 :             rt.block_on(collect_timeline_metrics())
     594           19 :         })
     595           19 :         .join()
     596           19 :         .expect("collect_timeline_metrics thread panicked");
     597              : 
     598           70 :         for tli in &infos {
     599           51 :             let tenant_id = tli.ttid.tenant_id.to_string();
     600           51 :             let timeline_id = tli.ttid.timeline_id.to_string();
     601           51 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     602           51 : 
     603           51 :             if tli.timeline_is_active {
     604           51 :                 active_timelines_count += 1;
     605           51 :             }
     606              : 
     607           51 :             self.commit_lsn
     608           51 :                 .with_label_values(labels)
     609           51 :                 .set(tli.mem_state.commit_lsn.into());
     610           51 :             self.backup_lsn
     611           51 :                 .with_label_values(labels)
     612           51 :                 .set(tli.mem_state.backup_lsn.into());
     613           51 :             self.flush_lsn
     614           51 :                 .with_label_values(labels)
     615           51 :                 .set(tli.flush_lsn.into());
     616           51 :             self.epoch_start_lsn
     617           51 :                 .with_label_values(labels)
     618           51 :                 .set(tli.epoch_start_lsn.into());
     619           51 :             self.peer_horizon_lsn
     620           51 :                 .with_label_values(labels)
     621           51 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     622           51 :             self.remote_consistent_lsn
     623           51 :                 .with_label_values(labels)
     624           51 :                 .set(tli.mem_state.remote_consistent_lsn.into());
     625           51 :             self.timeline_active
     626           51 :                 .with_label_values(labels)
     627           51 :                 .set(tli.timeline_is_active as u64);
     628           51 :             self.wal_backup_active
     629           51 :                 .with_label_values(labels)
     630           51 :                 .set(tli.wal_backup_active as u64);
     631           51 :             self.connected_computes
     632           51 :                 .with_label_values(labels)
     633           51 :                 .set(tli.num_computes as i64);
     634           51 :             self.acceptor_term
     635           51 :                 .with_label_values(labels)
     636           51 :                 .set(tli.persisted_state.acceptor_state.term);
     637           51 :             self.written_wal_bytes
     638           51 :                 .with_label_values(labels)
     639           51 :                 .set(tli.wal_storage.write_wal_bytes);
     640           51 :             self.written_wal_seconds
     641           51 :                 .with_label_values(labels)
     642           51 :                 .set(tli.wal_storage.write_wal_seconds);
     643           51 :             self.flushed_wal_seconds
     644           51 :                 .with_label_values(labels)
     645           51 :                 .set(tli.wal_storage.flush_wal_seconds);
     646           51 : 
     647           51 :             self.ps_last_received_lsn
     648           51 :                 .with_label_values(labels)
     649           51 :                 .set(tli.ps_feedback.last_received_lsn.0);
     650           51 :             if let Ok(unix_time) = tli
     651           51 :                 .ps_feedback
     652           51 :                 .replytime
     653           51 :                 .duration_since(SystemTime::UNIX_EPOCH)
     654           51 :             {
     655           51 :                 self.feedback_last_time_seconds
     656           51 :                     .with_label_values(labels)
     657           51 :                     .set(unix_time.as_secs());
     658           51 :             }
     659              : 
     660           51 :             if tli.last_removed_segno != 0 {
     661            0 :                 let segno_count = tli
     662            0 :                     .flush_lsn
     663            0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     664            0 :                     - tli.last_removed_segno;
     665            0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     666            0 :                 self.disk_usage
     667            0 :                     .with_label_values(labels)
     668            0 :                     .set(disk_usage_bytes);
     669           51 :             }
     670              :         }
     671              : 
     672              :         // collect MetricFamilys.
     673           19 :         let mut mfs = Vec::new();
     674           19 :         mfs.extend(self.commit_lsn.collect());
     675           19 :         mfs.extend(self.backup_lsn.collect());
     676           19 :         mfs.extend(self.flush_lsn.collect());
     677           19 :         mfs.extend(self.epoch_start_lsn.collect());
     678           19 :         mfs.extend(self.peer_horizon_lsn.collect());
     679           19 :         mfs.extend(self.remote_consistent_lsn.collect());
     680           19 :         mfs.extend(self.ps_last_received_lsn.collect());
     681           19 :         mfs.extend(self.feedback_last_time_seconds.collect());
     682           19 :         mfs.extend(self.timeline_active.collect());
     683           19 :         mfs.extend(self.wal_backup_active.collect());
     684           19 :         mfs.extend(self.connected_computes.collect());
     685           19 :         mfs.extend(self.disk_usage.collect());
     686           19 :         mfs.extend(self.acceptor_term.collect());
     687           19 :         mfs.extend(self.written_wal_bytes.collect());
     688           19 :         mfs.extend(self.written_wal_seconds.collect());
     689           19 :         mfs.extend(self.flushed_wal_seconds.collect());
     690           19 : 
     691           19 :         // report time it took to collect all info
     692           19 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     693           19 :         self.collect_timeline_metrics.set(elapsed);
     694           19 :         mfs.extend(self.collect_timeline_metrics.collect());
     695           19 : 
     696           19 :         // report total number of timelines
     697           19 :         self.timelines_count.set(timelines_count as i64);
     698           19 :         self.active_timelines_count
     699           19 :             .set(active_timelines_count as i64);
     700           19 :         mfs.extend(self.timelines_count.collect());
     701           19 : 
     702           19 :         mfs
     703           19 :     }
     704              : }
     705              : 
     706           19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
     707           19 :     let mut res = vec![];
     708           19 :     let timelines = GlobalTimelines::get_all();
     709              : 
     710           70 :     for tli in timelines {
     711           51 :         if let Some(info) = tli.info_for_metrics().await {
     712           51 :             res.push(info);
     713           51 :         }
     714              :     }
     715           19 :     res
     716           19 : }
        

Generated by: LCOV version 2.1-beta