LCOV - code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 94.8 % 553 524
Test Date: 2023-09-06 10:18:01 Functions: 77.0 % 61 47

            Line data    Source code
       1              : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2              : 
       3              : use std::{
       4              :     sync::{Arc, RwLock},
       5              :     time::{Instant, SystemTime},
       6              : };
       7              : 
       8              : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
       9              : use anyhow::Result;
      10              : use futures::Future;
      11              : use metrics::{
      12              :     core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
      13              :     proto::MetricFamily,
      14              :     register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
      15              : };
      16              : use once_cell::sync::Lazy;
      17              : 
      18              : use postgres_ffi::XLogSegNo;
      19              : use utils::pageserver_feedback::PageserverFeedback;
      20              : use utils::{id::TenantTimelineId, lsn::Lsn};
      21              : 
      22              : use crate::{
      23              :     safekeeper::{SafeKeeperState, SafekeeperMemState},
      24              :     GlobalTimelines,
      25              : };
      26              : 
      27              : // Global metrics across all timelines.
      28          436 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      29          436 :     register_histogram!(
      30          436 :         "safekeeper_write_wal_bytes",
      31          436 :         "Bytes written to WAL in a single request",
      32          436 :         vec![
      33          436 :             1.0,
      34          436 :             10.0,
      35          436 :             100.0,
      36          436 :             1024.0,
      37          436 :             8192.0,
      38          436 :             128.0 * 1024.0,
      39          436 :             1024.0 * 1024.0,
      40          436 :             10.0 * 1024.0 * 1024.0
      41          436 :         ]
      42          436 :     )
      43          436 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      44          436 : });
      45          436 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      46          436 :     register_histogram!(
      47          436 :         "safekeeper_write_wal_seconds",
      48          436 :         "Seconds spent writing and syncing WAL to a disk in a single request",
      49          436 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      50          436 :     )
      51          436 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      52          436 : });
      53            0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      54            0 :     register_histogram!(
      55            0 :         "safekeeper_flush_wal_seconds",
      56            0 :         "Seconds spent syncing WAL to a disk",
      57            0 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      58            0 :     )
      59            0 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      60            0 : });
      61          462 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      62          462 :     register_histogram!(
      63          462 :         "safekeeper_persist_control_file_seconds",
      64          462 :         "Seconds to persist and sync control file",
      65          462 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
      66          462 :     )
      67          462 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
      68          462 : });
      69          463 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
      70          463 :     register_int_counter_vec!(
      71          463 :         "safekeeper_pg_io_bytes_total",
      72          463 :         "Bytes read from or written to any PostgreSQL connection",
      73          463 :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
      74          463 :     )
      75          463 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
      76          463 : });
      77          447 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
      78          447 :     register_int_counter!(
      79          447 :         "safekeeper_broker_pushed_updates_total",
      80          447 :         "Number of timeline updates pushed to the broker"
      81          447 :     )
      82          447 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
      83          447 : });
      84          517 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
      85          517 :     register_int_counter_vec!(
      86          517 :         "safekeeper_broker_pulled_updates_total",
      87          517 :         "Number of timeline updates pulled and processed from the broker",
      88          517 :         &["result"]
      89          517 :     )
      90          517 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
      91          517 : });
      92          463 : pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
      93          463 :     register_int_counter_vec!(
      94          463 :         "safekeeper_pg_queries_received_total",
      95          463 :         "Number of queries received through pg protocol",
      96          463 :         &["query"]
      97          463 :     )
      98          463 :     .expect("Failed to register safekeeper_pg_queries_received_total counter")
      99          463 : });
     100          456 : pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
     101          456 :     register_int_counter_vec!(
     102          456 :         "safekeeper_pg_queries_finished_total",
     103          456 :         "Number of queries finished through pg protocol",
     104          456 :         &["query"]
     105          456 :     )
     106          456 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     107          456 : });
     108           13 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     109           13 :     register_int_counter!(
     110           13 :         "safekeeper_removed_wal_segments_total",
     111           13 :         "Number of WAL segments removed from the disk"
     112           13 :     )
     113           13 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     114           13 : });
     115           13 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     116           13 :     register_int_counter!(
     117           13 :         "safekeeper_backed_up_segments_total",
     118           13 :         "Number of WAL segments backed up to the broker"
     119           13 :     )
     120           13 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     121           13 : });
     122            2 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     123            2 :     register_int_counter!(
     124            2 :         "safekeeper_backup_errors_total",
     125            2 :         "Number of errors during backup"
     126            2 :     )
     127            2 :     .expect("Failed to register safekeeper_backup_errors_total counter")
     128            2 : });
     129          517 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     130          517 :     register_histogram!(
     131          517 :         "safekeeper_broker_push_update_seconds",
     132          517 :         "Seconds to push all timeline updates to the broker",
     133          517 :         DISK_WRITE_SECONDS_BUCKETS.to_vec()
     134          517 :     )
     135          517 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     136          517 : });
     137              : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     138              :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     139              : ];
     140          517 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     141          517 :     register_histogram!(
     142          517 :         "safekeeper_broker_iteration_timelines",
     143          517 :         "Count of timelines pushed to the broker in a single iteration",
     144          517 :         TIMELINES_COUNT_BUCKETS.to_vec()
     145          517 :     )
     146          517 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     147          517 : });
     148              : 
     149              : pub const LABEL_UNKNOWN: &str = "unknown";
     150              : 
     151              : /// Labels for traffic metrics.
     152            0 : #[derive(Clone)]
     153              : struct ConnectionLabels {
     154              :     /// Availability zone of the connection origin.
     155              :     client_az: String,
     156              :     /// Availability zone of the current safekeeper.
     157              :     sk_az: String,
     158              :     /// Client application name.
     159              :     app_name: String,
     160              : }
     161              : 
     162              : impl ConnectionLabels {
     163         3727 :     fn new() -> Self {
     164         3727 :         Self {
     165         3727 :             client_az: LABEL_UNKNOWN.to_string(),
     166         3727 :             sk_az: LABEL_UNKNOWN.to_string(),
     167         3727 :             app_name: LABEL_UNKNOWN.to_string(),
     168         3727 :         }
     169         3727 :     }
     170              : 
     171         8287 :     fn build_metrics(
     172         8287 :         &self,
     173         8287 :     ) -> (
     174         8287 :         GenericCounter<metrics::core::AtomicU64>,
     175         8287 :         GenericCounter<metrics::core::AtomicU64>,
     176         8287 :     ) {
     177         8287 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     178         8287 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     179            8 :             (client_az, sk_az) => {
     180            8 :                 if client_az == sk_az {
     181            0 :                     "true"
     182              :                 } else {
     183            8 :                     "false"
     184              :                 }
     185              :             }
     186              :         };
     187              : 
     188         8287 :         let read = PG_IO_BYTES.with_label_values(&[
     189         8287 :             &self.client_az,
     190         8287 :             &self.sk_az,
     191         8287 :             &self.app_name,
     192         8287 :             "read",
     193         8287 :             same_az,
     194         8287 :         ]);
     195         8287 :         let write = PG_IO_BYTES.with_label_values(&[
     196         8287 :             &self.client_az,
     197         8287 :             &self.sk_az,
     198         8287 :             &self.app_name,
     199         8287 :             "write",
     200         8287 :             same_az,
     201         8287 :         ]);
     202         8287 :         (read, write)
     203         8287 :     }
     204              : }
     205              : 
     206              : struct TrafficMetricsState {
     207              :     /// Labels for traffic metrics.
     208              :     labels: ConnectionLabels,
     209              :     /// Total bytes read from this connection.
     210              :     read: GenericCounter<metrics::core::AtomicU64>,
     211              :     /// Total bytes written to this connection.
     212              :     write: GenericCounter<metrics::core::AtomicU64>,
     213              : }
     214              : 
     215              : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     216         3727 : #[derive(Clone)]
     217              : pub struct TrafficMetrics {
     218              :     state: Arc<RwLock<TrafficMetricsState>>,
     219              : }
     220              : 
     221              : impl Default for TrafficMetrics {
     222            0 :     fn default() -> Self {
     223            0 :         Self::new()
     224            0 :     }
     225              : }
     226              : 
     227              : impl TrafficMetrics {
     228         3727 :     pub fn new() -> Self {
     229         3727 :         let labels = ConnectionLabels::new();
     230         3727 :         let (read, write) = labels.build_metrics();
     231         3727 :         let state = TrafficMetricsState {
     232         3727 :             labels,
     233         3727 :             read,
     234         3727 :             write,
     235         3727 :         };
     236         3727 :         Self {
     237         3727 :             state: Arc::new(RwLock::new(state)),
     238         3727 :         }
     239         3727 :     }
     240              : 
     241            4 :     pub fn set_client_az(&self, value: &str) {
     242            4 :         let mut state = self.state.write().unwrap();
     243            4 :         state.labels.client_az = value.to_string();
     244            4 :         (state.read, state.write) = state.labels.build_metrics();
     245            4 :     }
     246              : 
     247         3727 :     pub fn set_sk_az(&self, value: &str) {
     248         3727 :         let mut state = self.state.write().unwrap();
     249         3727 :         state.labels.sk_az = value.to_string();
     250         3727 :         (state.read, state.write) = state.labels.build_metrics();
     251         3727 :     }
     252              : 
     253          829 :     pub fn set_app_name(&self, value: &str) {
     254          829 :         let mut state = self.state.write().unwrap();
     255          829 :         state.labels.app_name = value.to_string();
     256          829 :         (state.read, state.write) = state.labels.build_metrics();
     257          829 :     }
     258              : 
     259      3228652 :     pub fn observe_read(&self, cnt: usize) {
     260      3228652 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     261      3228652 :     }
     262              : 
     263      2967391 :     pub fn observe_write(&self, cnt: usize) {
     264      2967391 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     265      2967391 :     }
     266              : }
     267              : 
     268              : /// Metrics for WalStorage in a single timeline.
     269          605 : #[derive(Clone, Default)]
     270              : pub struct WalStorageMetrics {
     271              :     /// How much bytes were written in total.
     272              :     write_wal_bytes: u64,
     273              :     /// How much time spent writing WAL to disk, waiting for write(2).
     274              :     write_wal_seconds: f64,
     275              :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     276              :     flush_wal_seconds: f64,
     277              : }
     278              : 
     279              : impl WalStorageMetrics {
     280      1460402 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     281      1460402 :         self.write_wal_bytes += bytes as u64;
     282      1460402 :         WRITE_WAL_BYTES.observe(bytes as f64);
     283      1460402 :     }
     284              : 
     285      1460402 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     286      1460402 :         self.write_wal_seconds += seconds;
     287      1460402 :         WRITE_WAL_SECONDS.observe(seconds);
     288      1460402 :     }
     289              : 
     290            0 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     291            0 :         self.flush_wal_seconds += seconds;
     292            0 :         FLUSH_WAL_SECONDS.observe(seconds);
     293            0 :     }
     294              : }
     295              : 
     296              : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     297      1460402 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     298      1460402 :     closure: impl Future<Output = Result<(), E>>,
     299      1460402 : ) -> Result<f64> {
     300      1460402 :     let start = std::time::Instant::now();
     301      3041998 :     closure.await.map_err(|e| e.into())?;
     302      1460402 :     Ok(start.elapsed().as_secs_f64())
     303      1460402 : }
     304              : 
     305              : /// Metrics for a single timeline.
     306            0 : #[derive(Clone)]
     307              : pub struct FullTimelineInfo {
     308              :     pub ttid: TenantTimelineId,
     309              :     pub ps_feedback: PageserverFeedback,
     310              :     pub wal_backup_active: bool,
     311              :     pub timeline_is_active: bool,
     312              :     pub num_computes: u32,
     313              :     pub last_removed_segno: XLogSegNo,
     314              : 
     315              :     pub epoch_start_lsn: Lsn,
     316              :     pub mem_state: SafekeeperMemState,
     317              :     pub persisted_state: SafeKeeperState,
     318              : 
     319              :     pub flush_lsn: Lsn,
     320              :     pub remote_consistent_lsn: Lsn,
     321              : 
     322              :     pub wal_storage: WalStorageMetrics,
     323              : }
     324              : 
     325              : /// Collects metrics for all active timelines.
     326              : pub struct TimelineCollector {
     327              :     descs: Vec<Desc>,
     328              :     commit_lsn: GenericGaugeVec<AtomicU64>,
     329              :     backup_lsn: GenericGaugeVec<AtomicU64>,
     330              :     flush_lsn: GenericGaugeVec<AtomicU64>,
     331              :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     332              :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     333              :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     334              :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     335              :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     336              :     timeline_active: GenericGaugeVec<AtomicU64>,
     337              :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     338              :     connected_computes: IntGaugeVec,
     339              :     disk_usage: GenericGaugeVec<AtomicU64>,
     340              :     acceptor_term: GenericGaugeVec<AtomicU64>,
     341              :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     342              :     written_wal_seconds: GaugeVec,
     343              :     flushed_wal_seconds: GaugeVec,
     344              :     collect_timeline_metrics: Gauge,
     345              :     timelines_count: IntGauge,
     346              : }
     347              : 
     348              : impl Default for TimelineCollector {
     349            0 :     fn default() -> Self {
     350            0 :         Self::new()
     351            0 :     }
     352              : }
     353              : 
     354              : impl TimelineCollector {
     355          517 :     pub fn new() -> TimelineCollector {
     356          517 :         let mut descs = Vec::new();
     357          517 : 
     358          517 :         let commit_lsn = GenericGaugeVec::new(
     359          517 :             Opts::new(
     360          517 :                 "safekeeper_commit_lsn",
     361          517 :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     362          517 :             ),
     363          517 :             &["tenant_id", "timeline_id"],
     364          517 :         )
     365          517 :         .unwrap();
     366          517 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     367          517 : 
     368          517 :         let backup_lsn = GenericGaugeVec::new(
     369          517 :             Opts::new(
     370          517 :                 "safekeeper_backup_lsn",
     371          517 :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     372          517 :             ),
     373          517 :             &["tenant_id", "timeline_id"],
     374          517 :         )
     375          517 :         .unwrap();
     376          517 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     377          517 : 
     378          517 :         let flush_lsn = GenericGaugeVec::new(
     379          517 :             Opts::new(
     380          517 :                 "safekeeper_flush_lsn",
     381          517 :                 "Current flush_lsn, grouped by timeline",
     382          517 :             ),
     383          517 :             &["tenant_id", "timeline_id"],
     384          517 :         )
     385          517 :         .unwrap();
     386          517 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     387          517 : 
     388          517 :         let epoch_start_lsn = GenericGaugeVec::new(
     389          517 :             Opts::new(
     390          517 :                 "safekeeper_epoch_start_lsn",
     391          517 :                 "Point since which compute generates new WAL in the current consensus term",
     392          517 :             ),
     393          517 :             &["tenant_id", "timeline_id"],
     394          517 :         )
     395          517 :         .unwrap();
     396          517 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     397          517 : 
     398          517 :         let peer_horizon_lsn = GenericGaugeVec::new(
     399          517 :             Opts::new(
     400          517 :                 "safekeeper_peer_horizon_lsn",
     401          517 :                 "LSN of the most lagging safekeeper",
     402          517 :             ),
     403          517 :             &["tenant_id", "timeline_id"],
     404          517 :         )
     405          517 :         .unwrap();
     406          517 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     407          517 : 
     408          517 :         let remote_consistent_lsn = GenericGaugeVec::new(
     409          517 :             Opts::new(
     410          517 :                 "safekeeper_remote_consistent_lsn",
     411          517 :                 "LSN which is persisted to the remote storage in pageserver",
     412          517 :             ),
     413          517 :             &["tenant_id", "timeline_id"],
     414          517 :         )
     415          517 :         .unwrap();
     416          517 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     417          517 : 
     418          517 :         let ps_last_received_lsn = GenericGaugeVec::new(
     419          517 :             Opts::new(
     420          517 :                 "safekeeper_ps_last_received_lsn",
     421          517 :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     422          517 :             ),
     423          517 :             &["tenant_id", "timeline_id"],
     424          517 :         )
     425          517 :         .unwrap();
     426          517 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     427          517 : 
     428          517 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     429          517 :             Opts::new(
     430          517 :                 "safekeeper_feedback_last_time_seconds",
     431          517 :                 "Timestamp of the last feedback from the pageserver",
     432          517 :             ),
     433          517 :             &["tenant_id", "timeline_id"],
     434          517 :         )
     435          517 :         .unwrap();
     436          517 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     437          517 : 
     438          517 :         let timeline_active = GenericGaugeVec::new(
     439          517 :             Opts::new(
     440          517 :                 "safekeeper_timeline_active",
     441          517 :                 "Reports 1 for active timelines, 0 for inactive",
     442          517 :             ),
     443          517 :             &["tenant_id", "timeline_id"],
     444          517 :         )
     445          517 :         .unwrap();
     446          517 :         descs.extend(timeline_active.desc().into_iter().cloned());
     447          517 : 
     448          517 :         let wal_backup_active = GenericGaugeVec::new(
     449          517 :             Opts::new(
     450          517 :                 "safekeeper_wal_backup_active",
     451          517 :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     452          517 :             ),
     453          517 :             &["tenant_id", "timeline_id"],
     454          517 :         )
     455          517 :         .unwrap();
     456          517 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     457          517 : 
     458          517 :         let connected_computes = IntGaugeVec::new(
     459          517 :             Opts::new(
     460          517 :                 "safekeeper_connected_computes",
     461          517 :                 "Number of active compute connections",
     462          517 :             ),
     463          517 :             &["tenant_id", "timeline_id"],
     464          517 :         )
     465          517 :         .unwrap();
     466          517 :         descs.extend(connected_computes.desc().into_iter().cloned());
     467          517 : 
     468          517 :         let disk_usage = GenericGaugeVec::new(
     469          517 :             Opts::new(
     470          517 :                 "safekeeper_disk_usage_bytes",
     471          517 :                 "Estimated disk space used to store WAL segments",
     472          517 :             ),
     473          517 :             &["tenant_id", "timeline_id"],
     474          517 :         )
     475          517 :         .unwrap();
     476          517 :         descs.extend(disk_usage.desc().into_iter().cloned());
     477          517 : 
     478          517 :         let acceptor_term = GenericGaugeVec::new(
     479          517 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     480          517 :             &["tenant_id", "timeline_id"],
     481          517 :         )
     482          517 :         .unwrap();
     483          517 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     484          517 : 
     485          517 :         let written_wal_bytes = GenericGaugeVec::new(
     486          517 :             Opts::new(
     487          517 :                 "safekeeper_written_wal_bytes_total",
     488          517 :                 "Number of WAL bytes written to disk, grouped by timeline",
     489          517 :             ),
     490          517 :             &["tenant_id", "timeline_id"],
     491          517 :         )
     492          517 :         .unwrap();
     493          517 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     494          517 : 
     495          517 :         let written_wal_seconds = GaugeVec::new(
     496          517 :             Opts::new(
     497          517 :                 "safekeeper_written_wal_seconds_total",
     498          517 :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     499          517 :             ),
     500          517 :             &["tenant_id", "timeline_id"],
     501          517 :         )
     502          517 :         .unwrap();
     503          517 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     504          517 : 
     505          517 :         let flushed_wal_seconds = GaugeVec::new(
     506          517 :             Opts::new(
     507          517 :                 "safekeeper_flushed_wal_seconds_total",
     508          517 :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     509          517 :             ),
     510          517 :             &["tenant_id", "timeline_id"],
     511          517 :         )
     512          517 :         .unwrap();
     513          517 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     514          517 : 
     515          517 :         let collect_timeline_metrics = Gauge::new(
     516          517 :             "safekeeper_collect_timeline_metrics_seconds",
     517          517 :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     518          517 :         )
     519          517 :         .unwrap();
     520          517 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     521          517 : 
     522          517 :         let timelines_count = IntGauge::new(
     523          517 :             "safekeeper_timelines",
     524          517 :             "Total number of timelines loaded in-memory",
     525          517 :         )
     526          517 :         .unwrap();
     527          517 :         descs.extend(timelines_count.desc().into_iter().cloned());
     528          517 : 
     529          517 :         TimelineCollector {
     530          517 :             descs,
     531          517 :             commit_lsn,
     532          517 :             backup_lsn,
     533          517 :             flush_lsn,
     534          517 :             epoch_start_lsn,
     535          517 :             peer_horizon_lsn,
     536          517 :             remote_consistent_lsn,
     537          517 :             ps_last_received_lsn,
     538          517 :             feedback_last_time_seconds,
     539          517 :             timeline_active,
     540          517 :             wal_backup_active,
     541          517 :             connected_computes,
     542          517 :             disk_usage,
     543          517 :             acceptor_term,
     544          517 :             written_wal_bytes,
     545          517 :             written_wal_seconds,
     546          517 :             flushed_wal_seconds,
     547          517 :             collect_timeline_metrics,
     548          517 :             timelines_count,
     549          517 :         }
     550          517 :     }
     551              : }
     552              : 
     553              : impl Collector for TimelineCollector {
     554          517 :     fn desc(&self) -> Vec<&Desc> {
     555          517 :         self.descs.iter().collect()
     556          517 :     }
     557              : 
     558           19 :     fn collect(&self) -> Vec<MetricFamily> {
     559           19 :         let start_collecting = Instant::now();
     560           19 : 
     561           19 :         // reset all metrics to clean up inactive timelines
     562           19 :         self.commit_lsn.reset();
     563           19 :         self.backup_lsn.reset();
     564           19 :         self.flush_lsn.reset();
     565           19 :         self.epoch_start_lsn.reset();
     566           19 :         self.peer_horizon_lsn.reset();
     567           19 :         self.remote_consistent_lsn.reset();
     568           19 :         self.ps_last_received_lsn.reset();
     569           19 :         self.feedback_last_time_seconds.reset();
     570           19 :         self.timeline_active.reset();
     571           19 :         self.wal_backup_active.reset();
     572           19 :         self.connected_computes.reset();
     573           19 :         self.disk_usage.reset();
     574           19 :         self.acceptor_term.reset();
     575           19 :         self.written_wal_bytes.reset();
     576           19 :         self.written_wal_seconds.reset();
     577           19 :         self.flushed_wal_seconds.reset();
     578           19 : 
     579           19 :         let timelines = GlobalTimelines::get_all();
     580           19 :         let timelines_count = timelines.len();
     581           19 : 
     582           19 :         // Prometheus Collector is sync, and data is stored under async lock. To
     583           19 :         // bridge the gap with a crutch, collect data in spawned thread with
     584           19 :         // local tokio runtime.
     585           19 :         let infos = std::thread::spawn(|| {
     586           19 :             let rt = tokio::runtime::Builder::new_current_thread()
     587           19 :                 .build()
     588           19 :                 .expect("failed to create rt");
     589           19 :             rt.block_on(collect_timeline_metrics())
     590           19 :         })
     591           19 :         .join()
     592           19 :         .expect("collect_timeline_metrics thread panicked");
     593              : 
     594           70 :         for tli in &infos {
     595           51 :             let tenant_id = tli.ttid.tenant_id.to_string();
     596           51 :             let timeline_id = tli.ttid.timeline_id.to_string();
     597           51 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     598           51 : 
     599           51 :             self.commit_lsn
     600           51 :                 .with_label_values(labels)
     601           51 :                 .set(tli.mem_state.commit_lsn.into());
     602           51 :             self.backup_lsn
     603           51 :                 .with_label_values(labels)
     604           51 :                 .set(tli.mem_state.backup_lsn.into());
     605           51 :             self.flush_lsn
     606           51 :                 .with_label_values(labels)
     607           51 :                 .set(tli.flush_lsn.into());
     608           51 :             self.epoch_start_lsn
     609           51 :                 .with_label_values(labels)
     610           51 :                 .set(tli.epoch_start_lsn.into());
     611           51 :             self.peer_horizon_lsn
     612           51 :                 .with_label_values(labels)
     613           51 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     614           51 :             self.remote_consistent_lsn
     615           51 :                 .with_label_values(labels)
     616           51 :                 .set(tli.remote_consistent_lsn.into());
     617           51 :             self.timeline_active
     618           51 :                 .with_label_values(labels)
     619           51 :                 .set(tli.timeline_is_active as u64);
     620           51 :             self.wal_backup_active
     621           51 :                 .with_label_values(labels)
     622           51 :                 .set(tli.wal_backup_active as u64);
     623           51 :             self.connected_computes
     624           51 :                 .with_label_values(labels)
     625           51 :                 .set(tli.num_computes as i64);
     626           51 :             self.acceptor_term
     627           51 :                 .with_label_values(labels)
     628           51 :                 .set(tli.persisted_state.acceptor_state.term);
     629           51 :             self.written_wal_bytes
     630           51 :                 .with_label_values(labels)
     631           51 :                 .set(tli.wal_storage.write_wal_bytes);
     632           51 :             self.written_wal_seconds
     633           51 :                 .with_label_values(labels)
     634           51 :                 .set(tli.wal_storage.write_wal_seconds);
     635           51 :             self.flushed_wal_seconds
     636           51 :                 .with_label_values(labels)
     637           51 :                 .set(tli.wal_storage.flush_wal_seconds);
     638           51 : 
     639           51 :             self.ps_last_received_lsn
     640           51 :                 .with_label_values(labels)
     641           51 :                 .set(tli.ps_feedback.last_received_lsn.0);
     642           51 :             if let Ok(unix_time) = tli
     643           51 :                 .ps_feedback
     644           51 :                 .replytime
     645           51 :                 .duration_since(SystemTime::UNIX_EPOCH)
     646           51 :             {
     647           51 :                 self.feedback_last_time_seconds
     648           51 :                     .with_label_values(labels)
     649           51 :                     .set(unix_time.as_secs());
     650           51 :             }
     651              : 
     652           51 :             if tli.last_removed_segno != 0 {
     653            0 :                 let segno_count = tli
     654            0 :                     .flush_lsn
     655            0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     656            0 :                     - tli.last_removed_segno;
     657            0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     658            0 :                 self.disk_usage
     659            0 :                     .with_label_values(labels)
     660            0 :                     .set(disk_usage_bytes);
     661           51 :             }
     662              :         }
     663              : 
     664              :         // collect MetricFamilys.
     665           19 :         let mut mfs = Vec::new();
     666           19 :         mfs.extend(self.commit_lsn.collect());
     667           19 :         mfs.extend(self.backup_lsn.collect());
     668           19 :         mfs.extend(self.flush_lsn.collect());
     669           19 :         mfs.extend(self.epoch_start_lsn.collect());
     670           19 :         mfs.extend(self.peer_horizon_lsn.collect());
     671           19 :         mfs.extend(self.remote_consistent_lsn.collect());
     672           19 :         mfs.extend(self.ps_last_received_lsn.collect());
     673           19 :         mfs.extend(self.feedback_last_time_seconds.collect());
     674           19 :         mfs.extend(self.timeline_active.collect());
     675           19 :         mfs.extend(self.wal_backup_active.collect());
     676           19 :         mfs.extend(self.connected_computes.collect());
     677           19 :         mfs.extend(self.disk_usage.collect());
     678           19 :         mfs.extend(self.acceptor_term.collect());
     679           19 :         mfs.extend(self.written_wal_bytes.collect());
     680           19 :         mfs.extend(self.written_wal_seconds.collect());
     681           19 :         mfs.extend(self.flushed_wal_seconds.collect());
     682           19 : 
     683           19 :         // report time it took to collect all info
     684           19 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     685           19 :         self.collect_timeline_metrics.set(elapsed);
     686           19 :         mfs.extend(self.collect_timeline_metrics.collect());
     687           19 : 
     688           19 :         // report total number of timelines
     689           19 :         self.timelines_count.set(timelines_count as i64);
     690           19 :         mfs.extend(self.timelines_count.collect());
     691           19 : 
     692           19 :         mfs
     693           19 :     }
     694              : }
     695              : 
     696           19 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
     697           19 :     let mut res = vec![];
     698           19 :     let timelines = GlobalTimelines::get_all();
     699              : 
     700           70 :     for tli in timelines {
     701           51 :         if let Some(info) = tli.info_for_metrics().await {
     702           51 :             res.push(info);
     703           51 :         }
     704              :     }
     705           19 :     res
     706           19 : }
        

Generated by: LCOV version 2.1-beta