LCOV - code coverage report
Current view: top level - safekeeper/src - metrics.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 1.1 % 525 6
Test Date: 2024-07-03 15:33:13 Functions: 4.3 % 46 2

            Line data    Source code
       1              : //! Global safekeeper mertics and per-timeline safekeeper metrics.
       2              : 
       3              : use std::{
       4              :     sync::{Arc, RwLock},
       5              :     time::{Instant, SystemTime},
       6              : };
       7              : 
       8              : use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
       9              : use anyhow::Result;
      10              : use futures::Future;
      11              : use metrics::{
      12              :     core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
      13              :     proto::MetricFamily,
      14              :     register_histogram_vec, register_int_counter, register_int_counter_pair,
      15              :     register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
      16              :     IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
      17              : };
      18              : use once_cell::sync::Lazy;
      19              : 
      20              : use postgres_ffi::XLogSegNo;
      21              : use utils::pageserver_feedback::PageserverFeedback;
      22              : use utils::{id::TenantTimelineId, lsn::Lsn};
      23              : 
      24              : use crate::{
      25              :     state::{TimelineMemState, TimelinePersistentState},
      26              :     GlobalTimelines,
      27              : };
      28              : 
      29              : // Global metrics across all timelines.
      30            0 : pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
      31              :     register_histogram!(
      32              :         "safekeeper_write_wal_bytes",
      33              :         "Bytes written to WAL in a single request",
      34              :         vec![
      35              :             1.0,
      36              :             10.0,
      37              :             100.0,
      38              :             1024.0,
      39              :             8192.0,
      40              :             128.0 * 1024.0,
      41              :             1024.0 * 1024.0,
      42              :             10.0 * 1024.0 * 1024.0
      43              :         ]
      44              :     )
      45            0 :     .expect("Failed to register safekeeper_write_wal_bytes histogram")
      46            0 : });
      47            0 : pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      48              :     register_histogram!(
      49              :         "safekeeper_write_wal_seconds",
      50              :         "Seconds spent writing and syncing WAL to a disk in a single request",
      51              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      52              :     )
      53            0 :     .expect("Failed to register safekeeper_write_wal_seconds histogram")
      54            0 : });
      55            0 : pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      56              :     register_histogram!(
      57              :         "safekeeper_flush_wal_seconds",
      58              :         "Seconds spent syncing WAL to a disk",
      59              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      60              :     )
      61            0 :     .expect("Failed to register safekeeper_flush_wal_seconds histogram")
      62            0 : });
      63            4 : pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
      64              :     register_histogram!(
      65              :         "safekeeper_persist_control_file_seconds",
      66              :         "Seconds to persist and sync control file",
      67              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      68              :     )
      69            4 :     .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
      70            4 : });
      71            0 : pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
      72              :     register_histogram_vec!(
      73              :         "safekeeper_wal_storage_operation_seconds",
      74              :         "Seconds spent on WAL storage operations",
      75              :         &["operation"],
      76              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      77              :     )
      78            0 :     .expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
      79            0 : });
      80           18 : pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
      81              :     register_histogram_vec!(
      82              :         "safekeeper_misc_operation_seconds",
      83              :         "Seconds spent on miscellaneous operations",
      84              :         &["operation"],
      85              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
      86              :     )
      87           18 :     .expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
      88           18 : });
      89            0 : pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
      90              :     register_int_counter_vec!(
      91              :         "safekeeper_pg_io_bytes_total",
      92              :         "Bytes read from or written to any PostgreSQL connection",
      93              :         &["client_az", "sk_az", "app_name", "dir", "same_az"]
      94              :     )
      95            0 :     .expect("Failed to register safekeeper_pg_io_bytes gauge")
      96            0 : });
      97            0 : pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
      98              :     register_int_counter!(
      99              :         "safekeeper_broker_pushed_updates_total",
     100              :         "Number of timeline updates pushed to the broker"
     101              :     )
     102            0 :     .expect("Failed to register safekeeper_broker_pushed_updates_total counter")
     103            0 : });
     104            0 : pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
     105              :     register_int_counter_vec!(
     106              :         "safekeeper_broker_pulled_updates_total",
     107              :         "Number of timeline updates pulled and processed from the broker",
     108              :         &["result"]
     109              :     )
     110            0 :     .expect("Failed to register safekeeper_broker_pulled_updates_total counter")
     111            0 : });
     112            0 : pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
     113              :     register_int_counter_pair_vec!(
     114              :         "safekeeper_pg_queries_received_total",
     115              :         "Number of queries received through pg protocol",
     116              :         "safekeeper_pg_queries_finished_total",
     117              :         "Number of queries finished through pg protocol",
     118              :         &["query"]
     119              :     )
     120            0 :     .expect("Failed to register safekeeper_pg_queries_finished_total counter")
     121            0 : });
     122            0 : pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     123              :     register_int_counter!(
     124              :         "safekeeper_removed_wal_segments_total",
     125              :         "Number of WAL segments removed from the disk"
     126              :     )
     127            0 :     .expect("Failed to register safekeeper_removed_wal_segments_total counter")
     128            0 : });
     129            0 : pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
     130              :     register_int_counter!(
     131              :         "safekeeper_backed_up_segments_total",
     132              :         "Number of WAL segments backed up to the S3"
     133              :     )
     134            0 :     .expect("Failed to register safekeeper_backed_up_segments_total counter")
     135            0 : });
     136            0 : pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
     137              :     register_int_counter!(
     138              :         "safekeeper_backup_errors_total",
     139              :         "Number of errors during backup"
     140              :     )
     141            0 :     .expect("Failed to register safekeeper_backup_errors_total counter")
     142            0 : });
     143            0 : pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
     144              :     register_histogram!(
     145              :         "safekeeper_broker_push_update_seconds",
     146              :         "Seconds to push all timeline updates to the broker",
     147              :         DISK_FSYNC_SECONDS_BUCKETS.to_vec()
     148              :     )
     149            0 :     .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
     150            0 : });
     151              : pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
     152              :     1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
     153              : ];
     154            0 : pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
     155              :     register_histogram!(
     156              :         "safekeeper_broker_iteration_timelines",
     157              :         "Count of timelines pushed to the broker in a single iteration",
     158              :         TIMELINES_COUNT_BUCKETS.to_vec()
     159              :     )
     160            0 :     .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
     161            0 : });
     162            0 : pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
     163              :     register_int_counter!(
     164              :         "safekeeper_received_ps_feedbacks_total",
     165              :         "Number of pageserver feedbacks received"
     166              :     )
     167            0 :     .expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
     168            0 : });
     169            0 : pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
     170              :     register_int_counter_vec!(
     171              :         "safekeeper_partial_backup_uploads_total",
     172              :         "Number of partial backup uploads to the S3",
     173              :         &["result"]
     174              :     )
     175            0 :     .expect("Failed to register safekeeper_partial_backup_uploads_total counter")
     176            0 : });
     177            0 : pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
     178              :     register_int_counter!(
     179              :         "safekeeper_partial_backup_uploaded_bytes_total",
     180              :         "Number of bytes uploaded to the S3 during partial backup"
     181              :     )
     182            0 :     .expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
     183            0 : });
     184            0 : pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
     185              :     register_int_counter!(
     186              :         "safekeeper_manager_iterations_total",
     187              :         "Number of iterations of the timeline manager task"
     188              :     )
     189            0 :     .expect("Failed to register safekeeper_manager_iterations_total counter")
     190            0 : });
     191            0 : pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
     192              :     register_int_counter!(
     193              :         "safekeeper_manager_active_changes_total",
     194              :         "Number of timeline active status changes in the timeline manager task"
     195              :     )
     196            0 :     .expect("Failed to register safekeeper_manager_active_changes_total counter")
     197            0 : });
     198            0 : pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
     199              :     register_int_counter_pair!(
     200              :         "safekeeper_wal_backup_tasks_started_total",
     201              :         "Number of active WAL backup tasks",
     202              :         "safekeeper_wal_backup_tasks_finished_total",
     203              :         "Number of finished WAL backup tasks",
     204              :     )
     205            0 :     .expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
     206            0 : });
     207              : 
     208              : pub const LABEL_UNKNOWN: &str = "unknown";
     209              : 
     210              : /// Labels for traffic metrics.
     211              : #[derive(Clone)]
     212              : struct ConnectionLabels {
     213              :     /// Availability zone of the connection origin.
     214              :     client_az: String,
     215              :     /// Availability zone of the current safekeeper.
     216              :     sk_az: String,
     217              :     /// Client application name.
     218              :     app_name: String,
     219              : }
     220              : 
     221              : impl ConnectionLabels {
     222            0 :     fn new() -> Self {
     223            0 :         Self {
     224            0 :             client_az: LABEL_UNKNOWN.to_string(),
     225            0 :             sk_az: LABEL_UNKNOWN.to_string(),
     226            0 :             app_name: LABEL_UNKNOWN.to_string(),
     227            0 :         }
     228            0 :     }
     229              : 
     230            0 :     fn build_metrics(
     231            0 :         &self,
     232            0 :     ) -> (
     233            0 :         GenericCounter<metrics::core::AtomicU64>,
     234            0 :         GenericCounter<metrics::core::AtomicU64>,
     235            0 :     ) {
     236            0 :         let same_az = match (self.client_az.as_str(), self.sk_az.as_str()) {
     237            0 :             (LABEL_UNKNOWN, _) | (_, LABEL_UNKNOWN) => LABEL_UNKNOWN,
     238            0 :             (client_az, sk_az) => {
     239            0 :                 if client_az == sk_az {
     240            0 :                     "true"
     241              :                 } else {
     242            0 :                     "false"
     243              :                 }
     244              :             }
     245              :         };
     246              : 
     247            0 :         let read = PG_IO_BYTES.with_label_values(&[
     248            0 :             &self.client_az,
     249            0 :             &self.sk_az,
     250            0 :             &self.app_name,
     251            0 :             "read",
     252            0 :             same_az,
     253            0 :         ]);
     254            0 :         let write = PG_IO_BYTES.with_label_values(&[
     255            0 :             &self.client_az,
     256            0 :             &self.sk_az,
     257            0 :             &self.app_name,
     258            0 :             "write",
     259            0 :             same_az,
     260            0 :         ]);
     261            0 :         (read, write)
     262            0 :     }
     263              : }
     264              : 
     265              : struct TrafficMetricsState {
     266              :     /// Labels for traffic metrics.
     267              :     labels: ConnectionLabels,
     268              :     /// Total bytes read from this connection.
     269              :     read: GenericCounter<metrics::core::AtomicU64>,
     270              :     /// Total bytes written to this connection.
     271              :     write: GenericCounter<metrics::core::AtomicU64>,
     272              : }
     273              : 
     274              : /// Metrics for measuring traffic (r/w bytes) in a single PostgreSQL connection.
     275              : #[derive(Clone)]
     276              : pub struct TrafficMetrics {
     277              :     state: Arc<RwLock<TrafficMetricsState>>,
     278              : }
     279              : 
     280              : impl Default for TrafficMetrics {
     281            0 :     fn default() -> Self {
     282            0 :         Self::new()
     283            0 :     }
     284              : }
     285              : 
     286              : impl TrafficMetrics {
     287            0 :     pub fn new() -> Self {
     288            0 :         let labels = ConnectionLabels::new();
     289            0 :         let (read, write) = labels.build_metrics();
     290            0 :         let state = TrafficMetricsState {
     291            0 :             labels,
     292            0 :             read,
     293            0 :             write,
     294            0 :         };
     295            0 :         Self {
     296            0 :             state: Arc::new(RwLock::new(state)),
     297            0 :         }
     298            0 :     }
     299              : 
     300            0 :     pub fn set_client_az(&self, value: &str) {
     301            0 :         let mut state = self.state.write().unwrap();
     302            0 :         state.labels.client_az = value.to_string();
     303            0 :         (state.read, state.write) = state.labels.build_metrics();
     304            0 :     }
     305              : 
     306            0 :     pub fn set_sk_az(&self, value: &str) {
     307            0 :         let mut state = self.state.write().unwrap();
     308            0 :         state.labels.sk_az = value.to_string();
     309            0 :         (state.read, state.write) = state.labels.build_metrics();
     310            0 :     }
     311              : 
     312            0 :     pub fn set_app_name(&self, value: &str) {
     313            0 :         let mut state = self.state.write().unwrap();
     314            0 :         state.labels.app_name = value.to_string();
     315            0 :         (state.read, state.write) = state.labels.build_metrics();
     316            0 :     }
     317              : 
     318            0 :     pub fn observe_read(&self, cnt: usize) {
     319            0 :         self.state.read().unwrap().read.inc_by(cnt as u64)
     320            0 :     }
     321              : 
     322            0 :     pub fn observe_write(&self, cnt: usize) {
     323            0 :         self.state.read().unwrap().write.inc_by(cnt as u64)
     324            0 :     }
     325              : }
     326              : 
     327              : /// Metrics for WalStorage in a single timeline.
     328              : #[derive(Clone, Default)]
     329              : pub struct WalStorageMetrics {
     330              :     /// How much bytes were written in total.
     331              :     write_wal_bytes: u64,
     332              :     /// How much time spent writing WAL to disk, waiting for write(2).
     333              :     write_wal_seconds: f64,
     334              :     /// How much time spent syncing WAL to disk, waiting for fsync(2).
     335              :     flush_wal_seconds: f64,
     336              : }
     337              : 
     338              : impl WalStorageMetrics {
     339            0 :     pub fn observe_write_bytes(&mut self, bytes: usize) {
     340            0 :         self.write_wal_bytes += bytes as u64;
     341            0 :         WRITE_WAL_BYTES.observe(bytes as f64);
     342            0 :     }
     343              : 
     344            0 :     pub fn observe_write_seconds(&mut self, seconds: f64) {
     345            0 :         self.write_wal_seconds += seconds;
     346            0 :         WRITE_WAL_SECONDS.observe(seconds);
     347            0 :     }
     348              : 
     349            0 :     pub fn observe_flush_seconds(&mut self, seconds: f64) {
     350            0 :         self.flush_wal_seconds += seconds;
     351            0 :         FLUSH_WAL_SECONDS.observe(seconds);
     352            0 :     }
     353              : }
     354              : 
     355              : /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
     356            0 : pub async fn time_io_closure<E: Into<anyhow::Error>>(
     357            0 :     closure: impl Future<Output = Result<(), E>>,
     358            0 : ) -> Result<f64> {
     359            0 :     let start = std::time::Instant::now();
     360            0 :     closure.await.map_err(|e| e.into())?;
     361            0 :     Ok(start.elapsed().as_secs_f64())
     362            0 : }
     363              : 
     364              : /// Metrics for a single timeline.
     365              : #[derive(Clone)]
     366              : pub struct FullTimelineInfo {
     367              :     pub ttid: TenantTimelineId,
     368              :     pub ps_feedback_count: u64,
     369              :     pub last_ps_feedback: PageserverFeedback,
     370              :     pub wal_backup_active: bool,
     371              :     pub timeline_is_active: bool,
     372              :     pub num_computes: u32,
     373              :     pub last_removed_segno: XLogSegNo,
     374              : 
     375              :     pub epoch_start_lsn: Lsn,
     376              :     pub mem_state: TimelineMemState,
     377              :     pub persisted_state: TimelinePersistentState,
     378              : 
     379              :     pub flush_lsn: Lsn,
     380              : 
     381              :     pub wal_storage: WalStorageMetrics,
     382              : }
     383              : 
     384              : /// Collects metrics for all active timelines.
     385              : pub struct TimelineCollector {
     386              :     descs: Vec<Desc>,
     387              :     commit_lsn: GenericGaugeVec<AtomicU64>,
     388              :     backup_lsn: GenericGaugeVec<AtomicU64>,
     389              :     flush_lsn: GenericGaugeVec<AtomicU64>,
     390              :     epoch_start_lsn: GenericGaugeVec<AtomicU64>,
     391              :     peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
     392              :     remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
     393              :     ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
     394              :     feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
     395              :     ps_feedback_count: GenericGaugeVec<AtomicU64>,
     396              :     timeline_active: GenericGaugeVec<AtomicU64>,
     397              :     wal_backup_active: GenericGaugeVec<AtomicU64>,
     398              :     connected_computes: IntGaugeVec,
     399              :     disk_usage: GenericGaugeVec<AtomicU64>,
     400              :     acceptor_term: GenericGaugeVec<AtomicU64>,
     401              :     written_wal_bytes: GenericGaugeVec<AtomicU64>,
     402              :     written_wal_seconds: GaugeVec,
     403              :     flushed_wal_seconds: GaugeVec,
     404              :     collect_timeline_metrics: Gauge,
     405              :     timelines_count: IntGauge,
     406              :     active_timelines_count: IntGauge,
     407              : }
     408              : 
     409              : impl Default for TimelineCollector {
     410            0 :     fn default() -> Self {
     411            0 :         Self::new()
     412            0 :     }
     413              : }
     414              : 
     415              : impl TimelineCollector {
     416            0 :     pub fn new() -> TimelineCollector {
     417            0 :         let mut descs = Vec::new();
     418            0 : 
     419            0 :         let commit_lsn = GenericGaugeVec::new(
     420            0 :             Opts::new(
     421            0 :                 "safekeeper_commit_lsn",
     422            0 :                 "Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
     423            0 :             ),
     424            0 :             &["tenant_id", "timeline_id"],
     425            0 :         )
     426            0 :         .unwrap();
     427            0 :         descs.extend(commit_lsn.desc().into_iter().cloned());
     428            0 : 
     429            0 :         let backup_lsn = GenericGaugeVec::new(
     430            0 :             Opts::new(
     431            0 :                 "safekeeper_backup_lsn",
     432            0 :                 "Current backup_lsn, up to which WAL is backed up, grouped by timeline",
     433            0 :             ),
     434            0 :             &["tenant_id", "timeline_id"],
     435            0 :         )
     436            0 :         .unwrap();
     437            0 :         descs.extend(backup_lsn.desc().into_iter().cloned());
     438            0 : 
     439            0 :         let flush_lsn = GenericGaugeVec::new(
     440            0 :             Opts::new(
     441            0 :                 "safekeeper_flush_lsn",
     442            0 :                 "Current flush_lsn, grouped by timeline",
     443            0 :             ),
     444            0 :             &["tenant_id", "timeline_id"],
     445            0 :         )
     446            0 :         .unwrap();
     447            0 :         descs.extend(flush_lsn.desc().into_iter().cloned());
     448            0 : 
     449            0 :         let epoch_start_lsn = GenericGaugeVec::new(
     450            0 :             Opts::new(
     451            0 :                 "safekeeper_epoch_start_lsn",
     452            0 :                 "Point since which compute generates new WAL in the current consensus term",
     453            0 :             ),
     454            0 :             &["tenant_id", "timeline_id"],
     455            0 :         )
     456            0 :         .unwrap();
     457            0 :         descs.extend(epoch_start_lsn.desc().into_iter().cloned());
     458            0 : 
     459            0 :         let peer_horizon_lsn = GenericGaugeVec::new(
     460            0 :             Opts::new(
     461            0 :                 "safekeeper_peer_horizon_lsn",
     462            0 :                 "LSN of the most lagging safekeeper",
     463            0 :             ),
     464            0 :             &["tenant_id", "timeline_id"],
     465            0 :         )
     466            0 :         .unwrap();
     467            0 :         descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
     468            0 : 
     469            0 :         let remote_consistent_lsn = GenericGaugeVec::new(
     470            0 :             Opts::new(
     471            0 :                 "safekeeper_remote_consistent_lsn",
     472            0 :                 "LSN which is persisted to the remote storage in pageserver",
     473            0 :             ),
     474            0 :             &["tenant_id", "timeline_id"],
     475            0 :         )
     476            0 :         .unwrap();
     477            0 :         descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
     478            0 : 
     479            0 :         let ps_last_received_lsn = GenericGaugeVec::new(
     480            0 :             Opts::new(
     481            0 :                 "safekeeper_ps_last_received_lsn",
     482            0 :                 "Last LSN received by the pageserver, acknowledged in the feedback",
     483            0 :             ),
     484            0 :             &["tenant_id", "timeline_id"],
     485            0 :         )
     486            0 :         .unwrap();
     487            0 :         descs.extend(ps_last_received_lsn.desc().into_iter().cloned());
     488            0 : 
     489            0 :         let feedback_last_time_seconds = GenericGaugeVec::new(
     490            0 :             Opts::new(
     491            0 :                 "safekeeper_feedback_last_time_seconds",
     492            0 :                 "Timestamp of the last feedback from the pageserver",
     493            0 :             ),
     494            0 :             &["tenant_id", "timeline_id"],
     495            0 :         )
     496            0 :         .unwrap();
     497            0 :         descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
     498            0 : 
     499            0 :         let ps_feedback_count = GenericGaugeVec::new(
     500            0 :             Opts::new(
     501            0 :                 "safekeeper_ps_feedback_count_total",
     502            0 :                 "Number of feedbacks received from the pageserver",
     503            0 :             ),
     504            0 :             &["tenant_id", "timeline_id"],
     505            0 :         )
     506            0 :         .unwrap();
     507            0 : 
     508            0 :         let timeline_active = GenericGaugeVec::new(
     509            0 :             Opts::new(
     510            0 :                 "safekeeper_timeline_active",
     511            0 :                 "Reports 1 for active timelines, 0 for inactive",
     512            0 :             ),
     513            0 :             &["tenant_id", "timeline_id"],
     514            0 :         )
     515            0 :         .unwrap();
     516            0 :         descs.extend(timeline_active.desc().into_iter().cloned());
     517            0 : 
     518            0 :         let wal_backup_active = GenericGaugeVec::new(
     519            0 :             Opts::new(
     520            0 :                 "safekeeper_wal_backup_active",
     521            0 :                 "Reports 1 for timelines with active WAL backup, 0 otherwise",
     522            0 :             ),
     523            0 :             &["tenant_id", "timeline_id"],
     524            0 :         )
     525            0 :         .unwrap();
     526            0 :         descs.extend(wal_backup_active.desc().into_iter().cloned());
     527            0 : 
     528            0 :         let connected_computes = IntGaugeVec::new(
     529            0 :             Opts::new(
     530            0 :                 "safekeeper_connected_computes",
     531            0 :                 "Number of active compute connections",
     532            0 :             ),
     533            0 :             &["tenant_id", "timeline_id"],
     534            0 :         )
     535            0 :         .unwrap();
     536            0 :         descs.extend(connected_computes.desc().into_iter().cloned());
     537            0 : 
     538            0 :         let disk_usage = GenericGaugeVec::new(
     539            0 :             Opts::new(
     540            0 :                 "safekeeper_disk_usage_bytes",
     541            0 :                 "Estimated disk space used to store WAL segments",
     542            0 :             ),
     543            0 :             &["tenant_id", "timeline_id"],
     544            0 :         )
     545            0 :         .unwrap();
     546            0 :         descs.extend(disk_usage.desc().into_iter().cloned());
     547            0 : 
     548            0 :         let acceptor_term = GenericGaugeVec::new(
     549            0 :             Opts::new("safekeeper_acceptor_term", "Current consensus term"),
     550            0 :             &["tenant_id", "timeline_id"],
     551            0 :         )
     552            0 :         .unwrap();
     553            0 :         descs.extend(acceptor_term.desc().into_iter().cloned());
     554            0 : 
     555            0 :         let written_wal_bytes = GenericGaugeVec::new(
     556            0 :             Opts::new(
     557            0 :                 "safekeeper_written_wal_bytes_total",
     558            0 :                 "Number of WAL bytes written to disk, grouped by timeline",
     559            0 :             ),
     560            0 :             &["tenant_id", "timeline_id"],
     561            0 :         )
     562            0 :         .unwrap();
     563            0 :         descs.extend(written_wal_bytes.desc().into_iter().cloned());
     564            0 : 
     565            0 :         let written_wal_seconds = GaugeVec::new(
     566            0 :             Opts::new(
     567            0 :                 "safekeeper_written_wal_seconds_total",
     568            0 :                 "Total time spent in write(2) writing WAL to disk, grouped by timeline",
     569            0 :             ),
     570            0 :             &["tenant_id", "timeline_id"],
     571            0 :         )
     572            0 :         .unwrap();
     573            0 :         descs.extend(written_wal_seconds.desc().into_iter().cloned());
     574            0 : 
     575            0 :         let flushed_wal_seconds = GaugeVec::new(
     576            0 :             Opts::new(
     577            0 :                 "safekeeper_flushed_wal_seconds_total",
     578            0 :                 "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
     579            0 :             ),
     580            0 :             &["tenant_id", "timeline_id"],
     581            0 :         )
     582            0 :         .unwrap();
     583            0 :         descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
     584            0 : 
     585            0 :         let collect_timeline_metrics = Gauge::new(
     586            0 :             "safekeeper_collect_timeline_metrics_seconds",
     587            0 :             "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
     588            0 :         )
     589            0 :         .unwrap();
     590            0 :         descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
     591            0 : 
     592            0 :         let timelines_count = IntGauge::new(
     593            0 :             "safekeeper_timelines",
     594            0 :             "Total number of timelines loaded in-memory",
     595            0 :         )
     596            0 :         .unwrap();
     597            0 :         descs.extend(timelines_count.desc().into_iter().cloned());
     598            0 : 
     599            0 :         let active_timelines_count = IntGauge::new(
     600            0 :             "safekeeper_active_timelines",
     601            0 :             "Total number of active timelines",
     602            0 :         )
     603            0 :         .unwrap();
     604            0 :         descs.extend(active_timelines_count.desc().into_iter().cloned());
     605            0 : 
     606            0 :         TimelineCollector {
     607            0 :             descs,
     608            0 :             commit_lsn,
     609            0 :             backup_lsn,
     610            0 :             flush_lsn,
     611            0 :             epoch_start_lsn,
     612            0 :             peer_horizon_lsn,
     613            0 :             remote_consistent_lsn,
     614            0 :             ps_last_received_lsn,
     615            0 :             feedback_last_time_seconds,
     616            0 :             ps_feedback_count,
     617            0 :             timeline_active,
     618            0 :             wal_backup_active,
     619            0 :             connected_computes,
     620            0 :             disk_usage,
     621            0 :             acceptor_term,
     622            0 :             written_wal_bytes,
     623            0 :             written_wal_seconds,
     624            0 :             flushed_wal_seconds,
     625            0 :             collect_timeline_metrics,
     626            0 :             timelines_count,
     627            0 :             active_timelines_count,
     628            0 :         }
     629            0 :     }
     630              : }
     631              : 
     632              : impl Collector for TimelineCollector {
     633            0 :     fn desc(&self) -> Vec<&Desc> {
     634            0 :         self.descs.iter().collect()
     635            0 :     }
     636              : 
     637            0 :     fn collect(&self) -> Vec<MetricFamily> {
     638            0 :         let start_collecting = Instant::now();
     639            0 : 
     640            0 :         // reset all metrics to clean up inactive timelines
     641            0 :         self.commit_lsn.reset();
     642            0 :         self.backup_lsn.reset();
     643            0 :         self.flush_lsn.reset();
     644            0 :         self.epoch_start_lsn.reset();
     645            0 :         self.peer_horizon_lsn.reset();
     646            0 :         self.remote_consistent_lsn.reset();
     647            0 :         self.ps_last_received_lsn.reset();
     648            0 :         self.feedback_last_time_seconds.reset();
     649            0 :         self.ps_feedback_count.reset();
     650            0 :         self.timeline_active.reset();
     651            0 :         self.wal_backup_active.reset();
     652            0 :         self.connected_computes.reset();
     653            0 :         self.disk_usage.reset();
     654            0 :         self.acceptor_term.reset();
     655            0 :         self.written_wal_bytes.reset();
     656            0 :         self.written_wal_seconds.reset();
     657            0 :         self.flushed_wal_seconds.reset();
     658            0 : 
     659            0 :         let timelines_count = GlobalTimelines::get_all().len();
     660            0 :         let mut active_timelines_count = 0;
     661            0 : 
     662            0 :         // Prometheus Collector is sync, and data is stored under async lock. To
     663            0 :         // bridge the gap with a crutch, collect data in spawned thread with
     664            0 :         // local tokio runtime.
     665            0 :         let infos = std::thread::spawn(|| {
     666            0 :             let rt = tokio::runtime::Builder::new_current_thread()
     667            0 :                 .build()
     668            0 :                 .expect("failed to create rt");
     669            0 :             rt.block_on(collect_timeline_metrics())
     670            0 :         })
     671            0 :         .join()
     672            0 :         .expect("collect_timeline_metrics thread panicked");
     673              : 
     674            0 :         for tli in &infos {
     675            0 :             let tenant_id = tli.ttid.tenant_id.to_string();
     676            0 :             let timeline_id = tli.ttid.timeline_id.to_string();
     677            0 :             let labels = &[tenant_id.as_str(), timeline_id.as_str()];
     678            0 : 
     679            0 :             if tli.timeline_is_active {
     680            0 :                 active_timelines_count += 1;
     681            0 :             }
     682              : 
     683            0 :             self.commit_lsn
     684            0 :                 .with_label_values(labels)
     685            0 :                 .set(tli.mem_state.commit_lsn.into());
     686            0 :             self.backup_lsn
     687            0 :                 .with_label_values(labels)
     688            0 :                 .set(tli.mem_state.backup_lsn.into());
     689            0 :             self.flush_lsn
     690            0 :                 .with_label_values(labels)
     691            0 :                 .set(tli.flush_lsn.into());
     692            0 :             self.epoch_start_lsn
     693            0 :                 .with_label_values(labels)
     694            0 :                 .set(tli.epoch_start_lsn.into());
     695            0 :             self.peer_horizon_lsn
     696            0 :                 .with_label_values(labels)
     697            0 :                 .set(tli.mem_state.peer_horizon_lsn.into());
     698            0 :             self.remote_consistent_lsn
     699            0 :                 .with_label_values(labels)
     700            0 :                 .set(tli.mem_state.remote_consistent_lsn.into());
     701            0 :             self.timeline_active
     702            0 :                 .with_label_values(labels)
     703            0 :                 .set(tli.timeline_is_active as u64);
     704            0 :             self.wal_backup_active
     705            0 :                 .with_label_values(labels)
     706            0 :                 .set(tli.wal_backup_active as u64);
     707            0 :             self.connected_computes
     708            0 :                 .with_label_values(labels)
     709            0 :                 .set(tli.num_computes as i64);
     710            0 :             self.acceptor_term
     711            0 :                 .with_label_values(labels)
     712            0 :                 .set(tli.persisted_state.acceptor_state.term);
     713            0 :             self.written_wal_bytes
     714            0 :                 .with_label_values(labels)
     715            0 :                 .set(tli.wal_storage.write_wal_bytes);
     716            0 :             self.written_wal_seconds
     717            0 :                 .with_label_values(labels)
     718            0 :                 .set(tli.wal_storage.write_wal_seconds);
     719            0 :             self.flushed_wal_seconds
     720            0 :                 .with_label_values(labels)
     721            0 :                 .set(tli.wal_storage.flush_wal_seconds);
     722            0 : 
     723            0 :             self.ps_last_received_lsn
     724            0 :                 .with_label_values(labels)
     725            0 :                 .set(tli.last_ps_feedback.last_received_lsn.0);
     726            0 :             self.ps_feedback_count
     727            0 :                 .with_label_values(labels)
     728            0 :                 .set(tli.ps_feedback_count);
     729            0 :             if let Ok(unix_time) = tli
     730            0 :                 .last_ps_feedback
     731            0 :                 .replytime
     732            0 :                 .duration_since(SystemTime::UNIX_EPOCH)
     733            0 :             {
     734            0 :                 self.feedback_last_time_seconds
     735            0 :                     .with_label_values(labels)
     736            0 :                     .set(unix_time.as_secs());
     737            0 :             }
     738              : 
     739            0 :             if tli.last_removed_segno != 0 {
     740            0 :                 let segno_count = tli
     741            0 :                     .flush_lsn
     742            0 :                     .segment_number(tli.persisted_state.server.wal_seg_size as usize)
     743            0 :                     - tli.last_removed_segno;
     744            0 :                 let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
     745            0 :                 self.disk_usage
     746            0 :                     .with_label_values(labels)
     747            0 :                     .set(disk_usage_bytes);
     748            0 :             }
     749              :         }
     750              : 
     751              :         // collect MetricFamilys.
     752            0 :         let mut mfs = Vec::new();
     753            0 :         mfs.extend(self.commit_lsn.collect());
     754            0 :         mfs.extend(self.backup_lsn.collect());
     755            0 :         mfs.extend(self.flush_lsn.collect());
     756            0 :         mfs.extend(self.epoch_start_lsn.collect());
     757            0 :         mfs.extend(self.peer_horizon_lsn.collect());
     758            0 :         mfs.extend(self.remote_consistent_lsn.collect());
     759            0 :         mfs.extend(self.ps_last_received_lsn.collect());
     760            0 :         mfs.extend(self.feedback_last_time_seconds.collect());
     761            0 :         mfs.extend(self.ps_feedback_count.collect());
     762            0 :         mfs.extend(self.timeline_active.collect());
     763            0 :         mfs.extend(self.wal_backup_active.collect());
     764            0 :         mfs.extend(self.connected_computes.collect());
     765            0 :         mfs.extend(self.disk_usage.collect());
     766            0 :         mfs.extend(self.acceptor_term.collect());
     767            0 :         mfs.extend(self.written_wal_bytes.collect());
     768            0 :         mfs.extend(self.written_wal_seconds.collect());
     769            0 :         mfs.extend(self.flushed_wal_seconds.collect());
     770            0 : 
     771            0 :         // report time it took to collect all info
     772            0 :         let elapsed = start_collecting.elapsed().as_secs_f64();
     773            0 :         self.collect_timeline_metrics.set(elapsed);
     774            0 :         mfs.extend(self.collect_timeline_metrics.collect());
     775            0 : 
     776            0 :         // report total number of timelines
     777            0 :         self.timelines_count.set(timelines_count as i64);
     778            0 :         mfs.extend(self.timelines_count.collect());
     779            0 : 
     780            0 :         self.active_timelines_count
     781            0 :             .set(active_timelines_count as i64);
     782            0 :         mfs.extend(self.active_timelines_count.collect());
     783            0 : 
     784            0 :         mfs
     785            0 :     }
     786              : }
     787              : 
     788            0 : async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
     789            0 :     let mut res = vec![];
     790            0 :     let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
     791              : 
     792            0 :     for tli in active_timelines {
     793            0 :         if let Some(info) = tli.info_for_metrics().await {
     794            0 :             res.push(info);
     795            0 :         }
     796              :     }
     797            0 :     res
     798            0 : }
        

Generated by: LCOV version 2.1-beta