LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 69.0 % 313 216
Test Date: 2025-07-16 12:29:03 Functions: 55.3 % 38 21

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::EventType;
       6              : use futures::stream::StreamExt;
       7              : use utils::id::{TenantId, TimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::{Cache, NewRawMetric};
      11              : use crate::context::RequestContext;
      12              : use crate::tenant::mgr::TenantManager;
      13              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      14              : 
      15              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      16              : /// instead of static str.
      17              : // Do not rename any of these without first consulting with data team and partner
      18              : // management.
      19            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      20              : pub(super) enum Name {
      21              :     /// Timeline last_record_lsn, absolute.
      22              :     #[serde(rename = "written_size")]
      23              :     WrittenSize,
      24              :     /// Timeline last_record_lsn, incremental
      25              :     #[serde(rename = "written_data_bytes_delta")]
      26              :     WrittenSizeDelta,
      27              :     /// Written bytes only on this timeline (not including ancestors):
      28              :     /// written_size - ancestor_lsn
      29              :     ///
      30              :     /// On the root branch, this is equivalent to `written_size`.
      31              :     #[serde(rename = "written_size_since_parent")]
      32              :     WrittenSizeSinceParent,
      33              :     /// PITR history size only on this timeline (not including ancestors):
      34              :     /// last_record_lsn - max(pitr_cutoff, ancestor_lsn).
      35              :     ///
      36              :     /// On the root branch, this is its entire PITR history size. Not emitted if GC hasn't computed
      37              :     /// the PITR cutoff yet. 0 if PITR is disabled.
      38              :     #[serde(rename = "pitr_history_size_since_parent")]
      39              :     PitrHistorySizeSinceParent,
      40              :     /// Timeline logical size
      41              :     #[serde(rename = "timeline_logical_size")]
      42              :     LogicalSize,
      43              :     /// Tenant remote size
      44              :     #[serde(rename = "remote_storage_size")]
      45              :     RemoteSize,
      46              :     /// Tenant synthetic size
      47              :     #[serde(rename = "synthetic_storage_size")]
      48              :     SyntheticSize,
      49              : }
      50              : 
      51              : /// Key that uniquely identifies the object this metric describes.
      52              : ///
      53              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      54              : /// elsewhere.
      55            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      56              : pub(crate) struct MetricsKey {
      57              :     pub(super) tenant_id: TenantId,
      58              : 
      59              :     #[serde(skip_serializing_if = "Option::is_none")]
      60              :     pub(super) timeline_id: Option<TimelineId>,
      61              : 
      62              :     pub(super) metric: Name,
      63              : }
      64              : 
      65              : impl MetricsKey {
      66          128 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      67          128 :         AbsoluteValueFactory(self)
      68          128 :     }
      69           32 :     const fn incremental_values(self) -> IncrementalValueFactory {
      70           32 :         IncrementalValueFactory(self)
      71           32 :     }
      72              : }
      73              : 
      74              : /// Helper type which each individual metric kind can return to produce only absolute values.
      75              : struct AbsoluteValueFactory(MetricsKey);
      76              : 
      77              : impl AbsoluteValueFactory {
      78              :     #[cfg(test)]
      79            6 :     const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
      80            6 :         let key = self.0;
      81            6 :         (key, (EventType::Absolute { time }, val))
      82            6 :     }
      83              : 
      84          111 :     const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
      85          111 :         let key = self.0;
      86          111 :         NewRawMetric {
      87          111 :             key,
      88          111 :             kind: EventType::Absolute { time },
      89          111 :             value: val,
      90          111 :         }
      91          111 :     }
      92              : 
      93           11 :     fn key(&self) -> &MetricsKey {
      94           11 :         &self.0
      95           11 :     }
      96              : }
      97              : 
      98              : /// Helper type which each individual metric kind can return to produce only incremental values.
      99              : struct IncrementalValueFactory(MetricsKey);
     100              : 
     101              : impl IncrementalValueFactory {
     102              :     #[allow(clippy::wrong_self_convention)]
     103           31 :     const fn from_until(
     104           31 :         self,
     105           31 :         prev_end: DateTime<Utc>,
     106           31 :         up_to: DateTime<Utc>,
     107           31 :         val: u64,
     108           31 :     ) -> NewRawMetric {
     109           31 :         let key = self.0;
     110              :         // cannot assert prev_end < up_to because these are realtime clock based
     111           31 :         let when = EventType::Incremental {
     112           31 :             start_time: prev_end,
     113           31 :             stop_time: up_to,
     114           31 :         };
     115           31 :         NewRawMetric {
     116           31 :             key,
     117           31 :             kind: when,
     118           31 :             value: val,
     119           31 :         }
     120           31 :     }
     121              : 
     122              :     #[allow(clippy::wrong_self_convention)]
     123              :     #[cfg(test)]
     124            1 :     const fn from_until_old_format(
     125            1 :         self,
     126            1 :         prev_end: DateTime<Utc>,
     127            1 :         up_to: DateTime<Utc>,
     128            1 :         val: u64,
     129            1 :     ) -> super::RawMetric {
     130            1 :         let key = self.0;
     131              :         // cannot assert prev_end < up_to because these are realtime clock based
     132            1 :         let when = EventType::Incremental {
     133            1 :             start_time: prev_end,
     134            1 :             stop_time: up_to,
     135            1 :         };
     136            1 :         (key, (when, val))
     137            1 :     }
     138              : 
     139           12 :     fn key(&self) -> &MetricsKey {
     140           12 :         &self.0
     141           12 :     }
     142              : }
     143              : 
     144              : // the static part of a MetricsKey
     145              : impl MetricsKey {
     146              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     147              :     ///
     148              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     149           33 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     150           33 :         MetricsKey {
     151           33 :             tenant_id,
     152           33 :             timeline_id: Some(timeline_id),
     153           33 :             metric: Name::WrittenSize,
     154           33 :         }
     155           33 :         .absolute_values()
     156           33 :     }
     157              : 
     158              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     159              :     /// previously sent, starting from the previously sent incremental time range ending at the
     160              :     /// latest absolute measurement.
     161           32 :     const fn written_size_delta(
     162           32 :         tenant_id: TenantId,
     163           32 :         timeline_id: TimelineId,
     164           32 :     ) -> IncrementalValueFactory {
     165           32 :         MetricsKey {
     166           32 :             tenant_id,
     167           32 :             timeline_id: Some(timeline_id),
     168           32 :             metric: Name::WrittenSizeDelta,
     169           32 :         }
     170           32 :         .incremental_values()
     171           32 :     }
     172              : 
     173              :     /// `written_size` - `ancestor_lsn`.
     174           28 :     const fn written_size_since_parent(
     175           28 :         tenant_id: TenantId,
     176           28 :         timeline_id: TimelineId,
     177           28 :     ) -> AbsoluteValueFactory {
     178           28 :         MetricsKey {
     179           28 :             tenant_id,
     180           28 :             timeline_id: Some(timeline_id),
     181           28 :             metric: Name::WrittenSizeSinceParent,
     182           28 :         }
     183           28 :         .absolute_values()
     184           28 :     }
     185              : 
     186              :     /// `written_size` - max(`pitr_cutoff`, `ancestor_lsn`).
     187           27 :     const fn pitr_history_size_since_parent(
     188           27 :         tenant_id: TenantId,
     189           27 :         timeline_id: TimelineId,
     190           27 :     ) -> AbsoluteValueFactory {
     191           27 :         MetricsKey {
     192           27 :             tenant_id,
     193           27 :             timeline_id: Some(timeline_id),
     194           27 :             metric: Name::PitrHistorySizeSinceParent,
     195           27 :         }
     196           27 :         .absolute_values()
     197           27 :     }
     198              : 
     199              :     /// Exact [`Timeline::get_current_logical_size`].
     200              :     ///
     201              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     202           22 :     const fn timeline_logical_size(
     203           22 :         tenant_id: TenantId,
     204           22 :         timeline_id: TimelineId,
     205           22 :     ) -> AbsoluteValueFactory {
     206           22 :         MetricsKey {
     207           22 :             tenant_id,
     208           22 :             timeline_id: Some(timeline_id),
     209           22 :             metric: Name::LogicalSize,
     210           22 :         }
     211           22 :         .absolute_values()
     212           22 :     }
     213              : 
     214              :     /// [`TenantShard::remote_size`]
     215              :     ///
     216              :     /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
     217            9 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     218            9 :         MetricsKey {
     219            9 :             tenant_id,
     220            9 :             timeline_id: None,
     221            9 :             metric: Name::RemoteSize,
     222            9 :         }
     223            9 :         .absolute_values()
     224            9 :     }
     225              : 
     226              :     /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     227              :     ///
     228              :     /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
     229              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     230            9 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     231            9 :         MetricsKey {
     232            9 :             tenant_id,
     233            9 :             timeline_id: None,
     234            9 :             metric: Name::SyntheticSize,
     235            9 :         }
     236            9 :         .absolute_values()
     237            9 :     }
     238              : }
     239              : 
     240            0 : pub(super) async fn collect_all_metrics(
     241            0 :     tenant_manager: &Arc<TenantManager>,
     242            0 :     cached_metrics: &Cache,
     243            0 :     ctx: &RequestContext,
     244            0 : ) -> Vec<NewRawMetric> {
     245              :     use pageserver_api::models::TenantState;
     246              : 
     247            0 :     let started_at = std::time::Instant::now();
     248              : 
     249            0 :     let tenants = match tenant_manager.list_tenants() {
     250            0 :         Ok(tenants) => tenants,
     251            0 :         Err(err) => {
     252            0 :             tracing::error!("failed to list tenants: {:?}", err);
     253            0 :             return vec![];
     254              :         }
     255              :     };
     256              : 
     257            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     258            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     259            0 :             None
     260              :         } else {
     261            0 :             tenant_manager
     262            0 :                 .get_attached_tenant_shard(id)
     263            0 :                 .ok()
     264            0 :                 .map(|tenant| (id.tenant_id, tenant))
     265              :         }
     266            0 :     });
     267              : 
     268            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     269              : 
     270            0 :     tracing::info!(
     271            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     272            0 :         total = res.len(),
     273            0 :         "collected metrics"
     274              :     );
     275              : 
     276            0 :     res
     277            0 : }
     278              : 
     279            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
     280            0 : where
     281            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
     282            0 : {
     283            0 :     let mut current_metrics: Vec<NewRawMetric> = Vec::new();
     284              : 
     285            0 :     let mut tenants = std::pin::pin!(tenants);
     286              : 
     287            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     288            0 :         let timelines = tenant.list_timelines();
     289            0 :         for timeline in timelines {
     290            0 :             let timeline_id = timeline.timeline_id;
     291              : 
     292            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     293            0 :                 Ok(Some(snap)) => {
     294            0 :                     snap.to_metrics(
     295            0 :                         tenant_id,
     296            0 :                         timeline_id,
     297            0 :                         Utc::now(),
     298            0 :                         &mut current_metrics,
     299            0 :                         cache,
     300            0 :                     );
     301            0 :                 }
     302            0 :                 Ok(None) => {}
     303            0 :                 Err(e) => {
     304            0 :                     tracing::error!(
     305            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     306            0 :                         timeline.timeline_id
     307              :                     );
     308            0 :                     continue;
     309              :                 }
     310              :             }
     311              :         }
     312              : 
     313            0 :         let snap = TenantSnapshot::collect(&tenant);
     314            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     315              :     }
     316              : 
     317            0 :     current_metrics
     318            0 : }
     319              : 
     320              : /// In-between abstraction to allow testing metrics without actual Tenants.
     321              : struct TenantSnapshot {
     322              :     remote_size: u64,
     323              :     synthetic_size: u64,
     324              : }
     325              : 
     326              : impl TenantSnapshot {
     327              :     /// Collect tenant status to have metrics created out of it.
     328            0 :     fn collect(t: &Arc<crate::tenant::TenantShard>) -> Self {
     329            0 :         TenantSnapshot {
     330            0 :             remote_size: t.remote_size(),
     331            0 :             // Note that this metric is calculated in a separate bgworker
     332            0 :             // Here we only use cached value, which may lag behind the real latest one
     333            0 :             synthetic_size: t.cached_synthetic_size(),
     334            0 :         }
     335            0 :     }
     336              : 
     337            2 :     fn to_metrics(
     338            2 :         &self,
     339            2 :         tenant_id: TenantId,
     340            2 :         now: DateTime<Utc>,
     341            2 :         cached: &Cache,
     342            2 :         metrics: &mut Vec<NewRawMetric>,
     343            2 :     ) {
     344            2 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     345              : 
     346            2 :         let synthetic_size = {
     347            2 :             let factory = MetricsKey::synthetic_size(tenant_id);
     348            2 :             let mut synthetic_size = self.synthetic_size;
     349              : 
     350            2 :             if synthetic_size == 0 {
     351            2 :                 if let Some(item) = cached.get(factory.key()) {
     352            1 :                     // use the latest value from previous session, TODO: check generation number
     353            1 :                     synthetic_size = item.value;
     354            1 :                 }
     355            0 :             }
     356              : 
     357            2 :             if synthetic_size != 0 {
     358              :                 // only send non-zeroes because otherwise these show up as errors in logs
     359            1 :                 Some(factory.at(now, synthetic_size))
     360              :             } else {
     361            1 :                 None
     362              :             }
     363              :         };
     364              : 
     365            2 :         metrics.extend([Some(remote_size), synthetic_size].into_iter().flatten());
     366            2 :     }
     367              : }
     368              : 
     369              : /// Internal type to make timeline metric production testable.
     370              : ///
     371              : /// As this value type contains all of the information needed from a timeline to produce the
     372              : /// metrics, it can easily be created with different values in test.
     373              : struct TimelineSnapshot {
     374              :     loaded_at: (Lsn, SystemTime),
     375              :     last_record_lsn: Lsn,
     376              :     ancestor_lsn: Lsn,
     377              :     current_exact_logical_size: Option<u64>,
     378              :     /// Whether PITR is enabled (pitr_interval > 0).
     379              :     pitr_enabled: bool,
     380              :     /// The PITR cutoff LSN. None if not yet initialized. If PITR is disabled, this is approximately
     381              :     /// Some(last_record_lsn), but may lag behind it since it's computed periodically.
     382              :     pitr_cutoff: Option<Lsn>,
     383              : }
     384              : 
     385              : impl TimelineSnapshot {
     386              :     /// Collect the metrics from an actual timeline.
     387              :     ///
     388              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     389              :     ///
     390              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     391            0 :     fn collect(
     392            0 :         t: &Arc<crate::tenant::Timeline>,
     393            0 :         ctx: &RequestContext,
     394            0 :     ) -> anyhow::Result<Option<Self>> {
     395            0 :         if !t.is_active() {
     396              :             // no collection for broken or stopping needed, we will still keep the cached values
     397              :             // though at the caller.
     398            0 :             Ok(None)
     399              :         } else {
     400            0 :             let loaded_at = t.loaded_at;
     401            0 :             let last_record_lsn = t.get_last_record_lsn();
     402            0 :             let ancestor_lsn = t.get_ancestor_lsn();
     403            0 :             let pitr_enabled = !t.get_pitr_interval().is_zero();
     404            0 :             let pitr_cutoff = t.gc_info.read().unwrap().cutoffs.time;
     405              : 
     406            0 :             let current_exact_logical_size = {
     407            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     408            0 :                 let size = span.in_scope(|| {
     409            0 :                     t.get_current_logical_size(
     410            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     411            0 :                         ctx,
     412              :                     )
     413            0 :                 });
     414            0 :                 match size {
     415              :                     // Only send timeline logical size when it is fully calculated.
     416            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     417            0 :                     CurrentLogicalSize::Approximate(_) => None,
     418              :                 }
     419              :             };
     420              : 
     421            0 :             Ok(Some(TimelineSnapshot {
     422            0 :                 loaded_at,
     423            0 :                 last_record_lsn,
     424            0 :                 ancestor_lsn,
     425            0 :                 current_exact_logical_size,
     426            0 :                 pitr_enabled,
     427            0 :                 pitr_cutoff,
     428            0 :             }))
     429              :         }
     430            0 :     }
     431              : 
     432              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     433           12 :     fn to_metrics(
     434           12 :         &self,
     435           12 :         tenant_id: TenantId,
     436           12 :         timeline_id: TimelineId,
     437           12 :         now: DateTime<Utc>,
     438           12 :         metrics: &mut Vec<NewRawMetric>,
     439           12 :         cache: &Cache,
     440           12 :     ) {
     441           12 :         let timeline_written_size = u64::from(self.last_record_lsn);
     442              : 
     443           12 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     444              : 
     445           12 :         let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
     446            7 :             item.kind
     447            7 :                 .incremental_timerange()
     448            7 :                 .expect("never create EventType::Absolute for written_size_delta")
     449            7 :                 .end
     450            7 :         });
     451              : 
     452           12 :         let written_size_now =
     453           12 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     454              : 
     455              :         // by default, use the last sent written_size as the basis for
     456              :         // calculating the delta. if we don't yet have one, use the load time value.
     457           12 :         let prev: (DateTime<Utc>, u64) = cache
     458           12 :             .get(&written_size_now.key)
     459           12 :             .map(|item| {
     460              :                 // use the prev time from our last incremental update, or default to latest
     461              :                 // absolute update on the first round.
     462            8 :                 let prev_at = item
     463            8 :                     .kind
     464            8 :                     .absolute_time()
     465            8 :                     .expect("never create EventType::Incremental for written_size");
     466            8 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     467            8 :                 (*prev_at, item.value)
     468            8 :             })
     469           12 :             .unwrap_or_else(|| {
     470              :                 // if we don't have a previous point of comparison, compare to the load time
     471              :                 // lsn.
     472            4 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     473            4 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     474            4 :             });
     475              : 
     476           12 :         let up_to = now;
     477              : 
     478           12 :         let written_size_last = written_size_now.value.max(prev.1); // don't regress
     479              : 
     480           12 :         if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
     481            6 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     482            6 :             // written_size_delta
     483            6 :             metrics.push(key_value);
     484            6 :             // written_size
     485            6 :             metrics.push(written_size_now);
     486            6 :         } else {
     487            6 :             // the cached value was ahead of us, report zero until we've caught up
     488            6 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     489            6 :             // the cached value was ahead of us, report the same until we've caught up
     490            6 :             metrics.push(NewRawMetric {
     491            6 :                 key: written_size_now.key,
     492            6 :                 kind: written_size_now.kind,
     493            6 :                 value: prev.1,
     494            6 :             });
     495            6 :         }
     496              : 
     497              :         // Compute the branch-local written size.
     498           12 :         let written_size_since_parent_key =
     499           12 :             MetricsKey::written_size_since_parent(tenant_id, timeline_id);
     500           12 :         metrics.push(
     501           12 :             written_size_since_parent_key
     502           12 :                 .at(now, written_size_last.saturating_sub(self.ancestor_lsn.0)),
     503              :         );
     504              : 
     505              :         // Compute the branch-local PITR history size. Not emitted if GC hasn't yet computed the
     506              :         // PITR cutoff. 0 if PITR is disabled.
     507           12 :         let pitr_history_size_since_parent_key =
     508           12 :             MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id);
     509           12 :         if !self.pitr_enabled {
     510            1 :             metrics.push(pitr_history_size_since_parent_key.at(now, 0));
     511           11 :         } else if let Some(pitr_cutoff) = self.pitr_cutoff {
     512            9 :             metrics.push(pitr_history_size_since_parent_key.at(
     513            9 :                 now,
     514            9 :                 written_size_last.saturating_sub(pitr_cutoff.max(self.ancestor_lsn).0),
     515            9 :             ));
     516            9 :         }
     517              : 
     518              :         {
     519           12 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     520           12 :             let current_or_previous = self
     521           12 :                 .current_exact_logical_size
     522           12 :                 .or_else(|| cache.get(factory.key()).map(|item| item.value));
     523              : 
     524           12 :             if let Some(size) = current_or_previous {
     525            4 :                 metrics.push(factory.at(now, size));
     526            8 :             }
     527              :         }
     528           12 :     }
     529              : }
     530              : 
     531              : #[cfg(test)]
     532              : mod tests;
     533              : 
     534              : #[cfg(test)]
     535              : pub(crate) use tests::{metric_examples, metric_examples_old};
        

Generated by: LCOV version 2.1-beta