LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 67.8 % 286 194
Test Date: 2025-05-01 22:50:11 Functions: 54.8 % 42 23

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::EventType;
       6              : use futures::stream::StreamExt;
       7              : use utils::id::{TenantId, TimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::{Cache, NewRawMetric};
      11              : use crate::context::RequestContext;
      12              : use crate::tenant::mgr::TenantManager;
      13              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      14              : 
      15              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      16              : /// instead of static str.
      17              : // Do not rename any of these without first consulting with data team and partner
      18              : // management.
      19          420 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      20              : pub(super) enum Name {
      21              :     /// Timeline last_record_lsn, absolute
      22              :     #[serde(rename = "written_size")]
      23              :     WrittenSize,
      24              :     /// Timeline last_record_lsn, incremental
      25              :     #[serde(rename = "written_data_bytes_delta")]
      26              :     WrittenSizeDelta,
      27              :     /// Timeline logical size
      28              :     #[serde(rename = "timeline_logical_size")]
      29              :     LogicalSize,
      30              :     /// Tenant remote size
      31              :     #[serde(rename = "remote_storage_size")]
      32              :     RemoteSize,
      33              :     /// Tenant synthetic size
      34              :     #[serde(rename = "synthetic_storage_size")]
      35              :     SyntheticSize,
      36              : }
      37              : 
      38              : /// Key that uniquely identifies the object this metric describes.
      39              : ///
      40              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      41              : /// elsewhere.
      42          312 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      43              : pub(crate) struct MetricsKey {
      44              :     pub(super) tenant_id: TenantId,
      45              : 
      46              :     #[serde(skip_serializing_if = "Option::is_none")]
      47              :     pub(super) timeline_id: Option<TimelineId>,
      48              : 
      49              :     pub(super) metric: Name,
      50              : }
      51              : 
      52              : impl MetricsKey {
      53          588 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      54          588 :         AbsoluteValueFactory(self)
      55          588 :     }
      56          204 :     const fn incremental_values(self) -> IncrementalValueFactory {
      57          204 :         IncrementalValueFactory(self)
      58          204 :     }
      59              : }
      60              : 
      61              : /// Helper type which each individual metric kind can return to produce only absolute values.
      62              : struct AbsoluteValueFactory(MetricsKey);
      63              : 
      64              : impl AbsoluteValueFactory {
      65              :     #[cfg(test)]
      66           48 :     const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
      67           48 :         let key = self.0;
      68           48 :         (key, (EventType::Absolute { time }, val))
      69           48 :     }
      70              : 
      71          504 :     const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
      72          504 :         let key = self.0;
      73          504 :         NewRawMetric {
      74          504 :             key,
      75          504 :             kind: EventType::Absolute { time },
      76          504 :             value: val,
      77          504 :         }
      78          504 :     }
      79              : 
      80           60 :     fn key(&self) -> &MetricsKey {
      81           60 :         &self.0
      82           60 :     }
      83              : }
      84              : 
      85              : /// Helper type which each individual metric kind can return to produce only incremental values.
      86              : struct IncrementalValueFactory(MetricsKey);
      87              : 
      88              : impl IncrementalValueFactory {
      89              :     #[allow(clippy::wrong_self_convention)]
      90          192 :     const fn from_until(
      91          192 :         self,
      92          192 :         prev_end: DateTime<Utc>,
      93          192 :         up_to: DateTime<Utc>,
      94          192 :         val: u64,
      95          192 :     ) -> NewRawMetric {
      96          192 :         let key = self.0;
      97          192 :         // cannot assert prev_end < up_to because these are realtime clock based
      98          192 :         let when = EventType::Incremental {
      99          192 :             start_time: prev_end,
     100          192 :             stop_time: up_to,
     101          192 :         };
     102          192 :         NewRawMetric {
     103          192 :             key,
     104          192 :             kind: when,
     105          192 :             value: val,
     106          192 :         }
     107          192 :     }
     108              : 
     109              :     #[allow(clippy::wrong_self_convention)]
     110              :     #[cfg(test)]
     111           12 :     const fn from_until_old_format(
     112           12 :         self,
     113           12 :         prev_end: DateTime<Utc>,
     114           12 :         up_to: DateTime<Utc>,
     115           12 :         val: u64,
     116           12 :     ) -> super::RawMetric {
     117           12 :         let key = self.0;
     118           12 :         // cannot assert prev_end < up_to because these are realtime clock based
     119           12 :         let when = EventType::Incremental {
     120           12 :             start_time: prev_end,
     121           12 :             stop_time: up_to,
     122           12 :         };
     123           12 :         (key, (when, val))
     124           12 :     }
     125              : 
     126           72 :     fn key(&self) -> &MetricsKey {
     127           72 :         &self.0
     128           72 :     }
     129              : }
     130              : 
     131              : // the static part of a MetricsKey
     132              : impl MetricsKey {
     133              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     134              :     ///
     135              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     136          216 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     137          216 :         MetricsKey {
     138          216 :             tenant_id,
     139          216 :             timeline_id: Some(timeline_id),
     140          216 :             metric: Name::WrittenSize,
     141          216 :         }
     142          216 :         .absolute_values()
     143          216 :     }
     144              : 
     145              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     146              :     /// previously sent, starting from the previously sent incremental time range ending at the
     147              :     /// latest absolute measurement.
     148          204 :     const fn written_size_delta(
     149          204 :         tenant_id: TenantId,
     150          204 :         timeline_id: TimelineId,
     151          204 :     ) -> IncrementalValueFactory {
     152          204 :         MetricsKey {
     153          204 :             tenant_id,
     154          204 :             timeline_id: Some(timeline_id),
     155          204 :             metric: Name::WrittenSizeDelta,
     156          204 :         }
     157          204 :         .incremental_values()
     158          204 :     }
     159              : 
     160              :     /// Exact [`Timeline::get_current_logical_size`].
     161              :     ///
     162              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     163          180 :     const fn timeline_logical_size(
     164          180 :         tenant_id: TenantId,
     165          180 :         timeline_id: TimelineId,
     166          180 :     ) -> AbsoluteValueFactory {
     167          180 :         MetricsKey {
     168          180 :             tenant_id,
     169          180 :             timeline_id: Some(timeline_id),
     170          180 :             metric: Name::LogicalSize,
     171          180 :         }
     172          180 :         .absolute_values()
     173          180 :     }
     174              : 
     175              :     /// [`TenantShard::remote_size`]
     176              :     ///
     177              :     /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
     178           96 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     179           96 :         MetricsKey {
     180           96 :             tenant_id,
     181           96 :             timeline_id: None,
     182           96 :             metric: Name::RemoteSize,
     183           96 :         }
     184           96 :         .absolute_values()
     185           96 :     }
     186              : 
     187              :     /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     188              :     ///
     189              :     /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
     190              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     191           96 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     192           96 :         MetricsKey {
     193           96 :             tenant_id,
     194           96 :             timeline_id: None,
     195           96 :             metric: Name::SyntheticSize,
     196           96 :         }
     197           96 :         .absolute_values()
     198           96 :     }
     199              : }
     200              : 
     201            0 : pub(super) async fn collect_all_metrics(
     202            0 :     tenant_manager: &Arc<TenantManager>,
     203            0 :     cached_metrics: &Cache,
     204            0 :     ctx: &RequestContext,
     205            0 : ) -> Vec<NewRawMetric> {
     206              :     use pageserver_api::models::TenantState;
     207              : 
     208            0 :     let started_at = std::time::Instant::now();
     209              : 
     210            0 :     let tenants = match tenant_manager.list_tenants() {
     211            0 :         Ok(tenants) => tenants,
     212            0 :         Err(err) => {
     213            0 :             tracing::error!("failed to list tenants: {:?}", err);
     214            0 :             return vec![];
     215              :         }
     216              :     };
     217              : 
     218            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     219            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     220            0 :             None
     221              :         } else {
     222            0 :             tenant_manager
     223            0 :                 .get_attached_tenant_shard(id)
     224            0 :                 .ok()
     225            0 :                 .map(|tenant| (id.tenant_id, tenant))
     226              :         }
     227            0 :     });
     228              : 
     229            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     230              : 
     231            0 :     tracing::info!(
     232            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     233            0 :         total = res.len(),
     234            0 :         "collected metrics"
     235              :     );
     236              : 
     237            0 :     res
     238            0 : }
     239              : 
     240            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
     241            0 : where
     242            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
     243            0 : {
     244            0 :     let mut current_metrics: Vec<NewRawMetric> = Vec::new();
     245            0 : 
     246            0 :     let mut tenants = std::pin::pin!(tenants);
     247              : 
     248            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     249            0 :         let timelines = tenant.list_timelines();
     250            0 :         for timeline in timelines {
     251            0 :             let timeline_id = timeline.timeline_id;
     252            0 : 
     253            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     254            0 :                 Ok(Some(snap)) => {
     255            0 :                     snap.to_metrics(
     256            0 :                         tenant_id,
     257            0 :                         timeline_id,
     258            0 :                         Utc::now(),
     259            0 :                         &mut current_metrics,
     260            0 :                         cache,
     261            0 :                     );
     262            0 :                 }
     263            0 :                 Ok(None) => {}
     264            0 :                 Err(e) => {
     265            0 :                     tracing::error!(
     266            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     267            0 :                         timeline.timeline_id
     268              :                     );
     269            0 :                     continue;
     270              :                 }
     271              :             }
     272              :         }
     273              : 
     274            0 :         let snap = TenantSnapshot::collect(&tenant);
     275            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     276              :     }
     277              : 
     278            0 :     current_metrics
     279            0 : }
     280              : 
     281              : /// In-between abstraction to allow testing metrics without actual Tenants.
     282              : struct TenantSnapshot {
     283              :     remote_size: u64,
     284              :     synthetic_size: u64,
     285              : }
     286              : 
     287              : impl TenantSnapshot {
     288              :     /// Collect tenant status to have metrics created out of it.
     289            0 :     fn collect(t: &Arc<crate::tenant::TenantShard>) -> Self {
     290            0 :         TenantSnapshot {
     291            0 :             remote_size: t.remote_size(),
     292            0 :             // Note that this metric is calculated in a separate bgworker
     293            0 :             // Here we only use cached value, which may lag behind the real latest one
     294            0 :             synthetic_size: t.cached_synthetic_size(),
     295            0 :         }
     296            0 :     }
     297              : 
     298           24 :     fn to_metrics(
     299           24 :         &self,
     300           24 :         tenant_id: TenantId,
     301           24 :         now: DateTime<Utc>,
     302           24 :         cached: &Cache,
     303           24 :         metrics: &mut Vec<NewRawMetric>,
     304           24 :     ) {
     305           24 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     306              : 
     307           24 :         let synthetic_size = {
     308           24 :             let factory = MetricsKey::synthetic_size(tenant_id);
     309           24 :             let mut synthetic_size = self.synthetic_size;
     310           24 : 
     311           24 :             if synthetic_size == 0 {
     312           24 :                 if let Some(item) = cached.get(factory.key()) {
     313           12 :                     // use the latest value from previous session, TODO: check generation number
     314           12 :                     synthetic_size = item.value;
     315           12 :                 }
     316            0 :             }
     317              : 
     318           24 :             if synthetic_size != 0 {
     319              :                 // only send non-zeroes because otherwise these show up as errors in logs
     320           12 :                 Some(factory.at(now, synthetic_size))
     321              :             } else {
     322           12 :                 None
     323              :             }
     324              :         };
     325              : 
     326           24 :         metrics.extend([Some(remote_size), synthetic_size].into_iter().flatten());
     327           24 :     }
     328              : }
     329              : 
     330              : /// Internal type to make timeline metric production testable.
     331              : ///
     332              : /// As this value type contains all of the information needed from a timeline to produce the
     333              : /// metrics, it can easily be created with different values in test.
     334              : struct TimelineSnapshot {
     335              :     loaded_at: (Lsn, SystemTime),
     336              :     last_record_lsn: Lsn,
     337              :     current_exact_logical_size: Option<u64>,
     338              : }
     339              : 
     340              : impl TimelineSnapshot {
     341              :     /// Collect the metrics from an actual timeline.
     342              :     ///
     343              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     344              :     ///
     345              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     346            0 :     fn collect(
     347            0 :         t: &Arc<crate::tenant::Timeline>,
     348            0 :         ctx: &RequestContext,
     349            0 :     ) -> anyhow::Result<Option<Self>> {
     350            0 :         if !t.is_active() {
     351              :             // no collection for broken or stopping needed, we will still keep the cached values
     352              :             // though at the caller.
     353            0 :             Ok(None)
     354              :         } else {
     355            0 :             let loaded_at = t.loaded_at;
     356            0 :             let last_record_lsn = t.get_last_record_lsn();
     357              : 
     358            0 :             let current_exact_logical_size = {
     359            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     360            0 :                 let size = span.in_scope(|| {
     361            0 :                     t.get_current_logical_size(
     362            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     363            0 :                         ctx,
     364            0 :                     )
     365            0 :                 });
     366            0 :                 match size {
     367              :                     // Only send timeline logical size when it is fully calculated.
     368            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     369            0 :                     CurrentLogicalSize::Approximate(_) => None,
     370              :                 }
     371              :             };
     372              : 
     373            0 :             Ok(Some(TimelineSnapshot {
     374            0 :                 loaded_at,
     375            0 :                 last_record_lsn,
     376            0 :                 current_exact_logical_size,
     377            0 :             }))
     378              :         }
     379            0 :     }
     380              : 
     381              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     382           72 :     fn to_metrics(
     383           72 :         &self,
     384           72 :         tenant_id: TenantId,
     385           72 :         timeline_id: TimelineId,
     386           72 :         now: DateTime<Utc>,
     387           72 :         metrics: &mut Vec<NewRawMetric>,
     388           72 :         cache: &Cache,
     389           72 :     ) {
     390           72 :         let timeline_written_size = u64::from(self.last_record_lsn);
     391           72 : 
     392           72 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     393           72 : 
     394           72 :         let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
     395           36 :             item.kind
     396           36 :                 .incremental_timerange()
     397           36 :                 .expect("never create EventType::Absolute for written_size_delta")
     398           36 :                 .end
     399           72 :         });
     400           72 : 
     401           72 :         let written_size_now =
     402           72 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     403           72 : 
     404           72 :         // by default, use the last sent written_size as the basis for
     405           72 :         // calculating the delta. if we don't yet have one, use the load time value.
     406           72 :         let prev: (DateTime<Utc>, u64) = cache
     407           72 :             .get(&written_size_now.key)
     408           72 :             .map(|item| {
     409           48 :                 // use the prev time from our last incremental update, or default to latest
     410           48 :                 // absolute update on the first round.
     411           48 :                 let prev_at = item
     412           48 :                     .kind
     413           48 :                     .absolute_time()
     414           48 :                     .expect("never create EventType::Incremental for written_size");
     415           48 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     416           48 :                 (*prev_at, item.value)
     417           72 :             })
     418           72 :             .unwrap_or_else(|| {
     419           24 :                 // if we don't have a previous point of comparison, compare to the load time
     420           24 :                 // lsn.
     421           24 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     422           24 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     423           72 :             });
     424           72 : 
     425           72 :         let up_to = now;
     426              : 
     427           72 :         if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
     428           48 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     429           48 :             // written_size_delta
     430           48 :             metrics.push(key_value);
     431           48 :             // written_size
     432           48 :             metrics.push(written_size_now);
     433           48 :         } else {
     434           24 :             // the cached value was ahead of us, report zero until we've caught up
     435           24 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     436           24 :             // the cached value was ahead of us, report the same until we've caught up
     437           24 :             metrics.push(NewRawMetric {
     438           24 :                 key: written_size_now.key,
     439           24 :                 kind: written_size_now.kind,
     440           24 :                 value: prev.1,
     441           24 :             });
     442           24 :         }
     443              : 
     444              :         {
     445           72 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     446           72 :             let current_or_previous = self
     447           72 :                 .current_exact_logical_size
     448           72 :                 .or_else(|| cache.get(factory.key()).map(|item| item.value));
     449              : 
     450           72 :             if let Some(size) = current_or_previous {
     451           48 :                 metrics.push(factory.at(now, size));
     452           48 :             }
     453              :         }
     454           72 :     }
     455              : }
     456              : 
     457              : #[cfg(test)]
     458              : mod tests;
     459              : 
     460              : #[cfg(test)]
     461              : pub(crate) use tests::{metric_examples, metric_examples_old};
        

Generated by: LCOV version 2.1-beta