LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - (source / functions) Coverage Total Hit
Test: Lines: 68.9 % 302 208
Test Date: 2025-03-18 18:33:46 Functions: 53.3 % 45 24

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::EventType;
       6              : use futures::stream::StreamExt;
       7              : use utils::id::{TenantId, TimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::{Cache, NewRawMetric};
      11              : use crate::context::RequestContext;
      12              : use crate::tenant::mgr::TenantManager;
      13              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      14              : 
      15              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      16              : /// instead of static str.
      17              : // Do not rename any of these without first consulting with data team and partner
      18              : // management.
      19          192 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      20              : pub(super) enum Name {
      21              :     /// Timeline last_record_lsn, absolute
      22              :     #[serde(rename = "written_size")]
      23              :     WrittenSize,
      24              :     /// Timeline last_record_lsn, incremental
      25              :     #[serde(rename = "written_data_bytes_delta")]
      26              :     WrittenSizeDelta,
      27              :     /// Timeline logical size
      28              :     #[serde(rename = "timeline_logical_size")]
      29              :     LogicalSize,
      30              :     /// Tenant remote size
      31              :     #[serde(rename = "remote_storage_size")]
      32              :     RemoteSize,
      33              :     /// Tenant resident size
      34              :     #[serde(rename = "resident_size")]
      35              :     ResidentSize,
      36              :     /// Tenant synthetic size
      37              :     #[serde(rename = "synthetic_storage_size")]
      38              :     SyntheticSize,
      39              : }
      40              : 
      41              : /// Key that uniquely identifies the object this metric describes.
      42              : ///
      43              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      44              : /// elsewhere.
      45          120 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      46              : pub(crate) struct MetricsKey {
      47              :     pub(super) tenant_id: TenantId,
      48              : 
      49              :     #[serde(skip_serializing_if = "Option::is_none")]
      50              :     pub(super) timeline_id: Option<TimelineId>,
      51              : 
      52              :     pub(super) metric: Name,
      53              : }
      54              : 
      55              : impl MetricsKey {
      56          228 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      57          228 :         AbsoluteValueFactory(self)
      58          228 :     }
      59           68 :     const fn incremental_values(self) -> IncrementalValueFactory {
      60           68 :         IncrementalValueFactory(self)
      61           68 :     }
      62              : }
      63              : 
      64              : /// Helper type which each individual metric kind can return to produce only absolute values.
      65              : struct AbsoluteValueFactory(MetricsKey);
      66              : 
      67              : impl AbsoluteValueFactory {
      68              :     #[cfg(test)]
      69           20 :     const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
      70           20 :         let key = self.0;
      71           20 :         (key, (EventType::Absolute { time }, val))
      72           20 :     }
      73              : 
      74          196 :     const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
      75          196 :         let key = self.0;
      76          196 :         NewRawMetric {
      77          196 :             key,
      78          196 :             kind: EventType::Absolute { time },
      79          196 :             value: val,
      80          196 :         }
      81          196 :     }
      82              : 
      83           20 :     fn key(&self) -> &MetricsKey {
      84           20 :         &self.0
      85           20 :     }
      86              : }
      87              : 
      88              : /// Helper type which each individual metric kind can return to produce only incremental values.
      89              : struct IncrementalValueFactory(MetricsKey);
      90              : 
      91              : impl IncrementalValueFactory {
      92              :     #[allow(clippy::wrong_self_convention)]
      93           64 :     const fn from_until(
      94           64 :         self,
      95           64 :         prev_end: DateTime<Utc>,
      96           64 :         up_to: DateTime<Utc>,
      97           64 :         val: u64,
      98           64 :     ) -> NewRawMetric {
      99           64 :         let key = self.0;
     100           64 :         // cannot assert prev_end < up_to because these are realtime clock based
     101           64 :         let when = EventType::Incremental {
     102           64 :             start_time: prev_end,
     103           64 :             stop_time: up_to,
     104           64 :         };
     105           64 :         NewRawMetric {
     106           64 :             key,
     107           64 :             kind: when,
     108           64 :             value: val,
     109           64 :         }
     110           64 :     }
     111              : 
     112              :     #[allow(clippy::wrong_self_convention)]
     113              :     #[cfg(test)]
     114            4 :     const fn from_until_old_format(
     115            4 :         self,
     116            4 :         prev_end: DateTime<Utc>,
     117            4 :         up_to: DateTime<Utc>,
     118            4 :         val: u64,
     119            4 :     ) -> super::RawMetric {
     120            4 :         let key = self.0;
     121            4 :         // cannot assert prev_end < up_to because these are realtime clock based
     122            4 :         let when = EventType::Incremental {
     123            4 :             start_time: prev_end,
     124            4 :             stop_time: up_to,
     125            4 :         };
     126            4 :         (key, (when, val))
     127            4 :     }
     128              : 
     129           24 :     fn key(&self) -> &MetricsKey {
     130           24 :         &self.0
     131           24 :     }
     132              : }
     133              : 
     134              : // the static part of a MetricsKey
     135              : impl MetricsKey {
     136              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     137              :     ///
     138              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     139           72 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     140           72 :         MetricsKey {
     141           72 :             tenant_id,
     142           72 :             timeline_id: Some(timeline_id),
     143           72 :             metric: Name::WrittenSize,
     144           72 :         }
     145           72 :         .absolute_values()
     146           72 :     }
     147              : 
     148              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     149              :     /// previously sent, starting from the previously sent incremental time range ending at the
     150              :     /// latest absolute measurement.
     151           68 :     const fn written_size_delta(
     152           68 :         tenant_id: TenantId,
     153           68 :         timeline_id: TimelineId,
     154           68 :     ) -> IncrementalValueFactory {
     155           68 :         MetricsKey {
     156           68 :             tenant_id,
     157           68 :             timeline_id: Some(timeline_id),
     158           68 :             metric: Name::WrittenSizeDelta,
     159           68 :         }
     160           68 :         .incremental_values()
     161           68 :     }
     162              : 
     163              :     /// Exact [`Timeline::get_current_logical_size`].
     164              :     ///
     165              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     166           60 :     const fn timeline_logical_size(
     167           60 :         tenant_id: TenantId,
     168           60 :         timeline_id: TimelineId,
     169           60 :     ) -> AbsoluteValueFactory {
     170           60 :         MetricsKey {
     171           60 :             tenant_id,
     172           60 :             timeline_id: Some(timeline_id),
     173           60 :             metric: Name::LogicalSize,
     174           60 :         }
     175           60 :         .absolute_values()
     176           60 :     }
     177              : 
     178              :     /// [`Tenant::remote_size`]
     179              :     ///
     180              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     181           32 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     182           32 :         MetricsKey {
     183           32 :             tenant_id,
     184           32 :             timeline_id: None,
     185           32 :             metric: Name::RemoteSize,
     186           32 :         }
     187           32 :         .absolute_values()
     188           32 :     }
     189              : 
     190              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     191              :     ///
     192              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     193           32 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     194           32 :         MetricsKey {
     195           32 :             tenant_id,
     196           32 :             timeline_id: None,
     197           32 :             metric: Name::ResidentSize,
     198           32 :         }
     199           32 :         .absolute_values()
     200           32 :     }
     201              : 
     202              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     203              :     ///
     204              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     205              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     206           32 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     207           32 :         MetricsKey {
     208           32 :             tenant_id,
     209           32 :             timeline_id: None,
     210           32 :             metric: Name::SyntheticSize,
     211           32 :         }
     212           32 :         .absolute_values()
     213           32 :     }
     214              : }
     215              : 
     216            0 : pub(super) async fn collect_all_metrics(
     217            0 :     tenant_manager: &Arc<TenantManager>,
     218            0 :     cached_metrics: &Cache,
     219            0 :     ctx: &RequestContext,
     220            0 : ) -> Vec<NewRawMetric> {
     221              :     use pageserver_api::models::TenantState;
     222              : 
     223            0 :     let started_at = std::time::Instant::now();
     224              : 
     225            0 :     let tenants = match tenant_manager.list_tenants() {
     226            0 :         Ok(tenants) => tenants,
     227            0 :         Err(err) => {
     228            0 :             tracing::error!("failed to list tenants: {:?}", err);
     229            0 :             return vec![];
     230              :         }
     231              :     };
     232              : 
     233            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     234            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     235            0 :             None
     236              :         } else {
     237            0 :             tenant_manager
     238            0 :                 .get_attached_tenant_shard(id)
     239            0 :                 .ok()
     240            0 :                 .map(|tenant| (id.tenant_id, tenant))
     241              :         }
     242            0 :     });
     243              : 
     244            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     245              : 
     246            0 :     tracing::info!(
     247            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     248            0 :         total = res.len(),
     249            0 :         "collected metrics"
     250              :     );
     251              : 
     252            0 :     res
     253            0 : }
     254              : 
     255            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
     256            0 : where
     257            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     258            0 : {
     259            0 :     let mut current_metrics: Vec<NewRawMetric> = Vec::new();
     260            0 : 
     261            0 :     let mut tenants = std::pin::pin!(tenants);
     262              : 
     263            0 :     while let Some((tenant_id, tenant)) = {
     264            0 :         let mut tenant_resident_size = 0;
     265              : 
     266            0 :         for timeline in tenant.list_timelines() {
     267            0 :             let timeline_id = timeline.timeline_id;
     268            0 : 
     269            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     270            0 :                 Ok(Some(snap)) => {
     271            0 :                     snap.to_metrics(
     272            0 :                         tenant_id,
     273            0 :                         timeline_id,
     274            0 :                         Utc::now(),
     275            0 :                         &mut current_metrics,
     276            0 :                         cache,
     277            0 :                     );
     278            0 :                 }
     279            0 :                 Ok(None) => {}
     280            0 :                 Err(e) => {
     281            0 :                     tracing::error!(
     282            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     283            0 :                         timeline.timeline_id
     284              :                     );
     285            0 :                     continue;
     286              :                 }
     287              :             }
     288              : 
     289            0 :             tenant_resident_size += timeline.resident_physical_size();
     290              :         }
     291              : 
     292            0 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     293            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     294              :     }
     295              : 
     296            0 :     current_metrics
     297            0 : }
     298              : 
     299              : /// In-between abstraction to allow testing metrics without actual Tenants.
     300              : struct TenantSnapshot {
     301              :     resident_size: u64,
     302              :     remote_size: u64,
     303              :     synthetic_size: u64,
     304              : }
     305              : 
     306              : impl TenantSnapshot {
     307              :     /// Collect tenant status to have metrics created out of it.
     308              :     ///
     309              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     310              :     /// cannot just list timelines here.
     311            0 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     312            0 :         TenantSnapshot {
     313            0 :             resident_size,
     314            0 :             remote_size: t.remote_size(),
     315            0 :             // Note that this metric is calculated in a separate bgworker
     316            0 :             // Here we only use cached value, which may lag behind the real latest one
     317            0 :             synthetic_size: t.cached_synthetic_size(),
     318            0 :         }
     319            0 :     }
     320              : 
     321            8 :     fn to_metrics(
     322            8 :         &self,
     323            8 :         tenant_id: TenantId,
     324            8 :         now: DateTime<Utc>,
     325            8 :         cached: &Cache,
     326            8 :         metrics: &mut Vec<NewRawMetric>,
     327            8 :     ) {
     328            8 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     329            8 : 
     330            8 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     331              : 
     332            8 :         let synthetic_size = {
     333            8 :             let factory = MetricsKey::synthetic_size(tenant_id);
     334            8 :             let mut synthetic_size = self.synthetic_size;
     335            8 : 
     336            8 :             if synthetic_size == 0 {
     337            8 :                 if let Some(item) = cached.get(factory.key()) {
     338            4 :                     // use the latest value from previous session, TODO: check generation number
     339            4 :                     synthetic_size = item.value;
     340            4 :                 }
     341            0 :             }
     342              : 
     343            8 :             if synthetic_size != 0 {
     344              :                 // only send non-zeroes because otherwise these show up as errors in logs
     345            4 :                 Some(, synthetic_size))
     346              :             } else {
     347            4 :                 None
     348              :             }
     349              :         };
     350              : 
     351            8 :         metrics.extend(
     352            8 :             [Some(remote_size), Some(resident_size), synthetic_size]
     353            8 :                 .into_iter()
     354            8 :                 .flatten(),
     355            8 :         );
     356            8 :     }
     357              : }
     358              : 
     359              : /// Internal type to make timeline metric production testable.
     360              : ///
     361              : /// As this value type contains all of the information needed from a timeline to produce the
     362              : /// metrics, it can easily be created with different values in test.
     363              : struct TimelineSnapshot {
     364              :     loaded_at: (Lsn, SystemTime),
     365              :     last_record_lsn: Lsn,
     366              :     current_exact_logical_size: Option<u64>,
     367              : }
     368              : 
     369              : impl TimelineSnapshot {
     370              :     /// Collect the metrics from an actual timeline.
     371              :     ///
     372              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     373              :     ///
     374              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     375            0 :     fn collect(
     376            0 :         t: &Arc<crate::tenant::Timeline>,
     377            0 :         ctx: &RequestContext,
     378            0 :     ) -> anyhow::Result<Option<Self>> {
     379            0 :         if !t.is_active() {
     380              :             // no collection for broken or stopping needed, we will still keep the cached values
     381              :             // though at the caller.
     382            0 :             Ok(None)
     383              :         } else {
     384            0 :             let loaded_at = t.loaded_at;
     385            0 :             let last_record_lsn = t.get_last_record_lsn();
     386              : 
     387            0 :             let current_exact_logical_size = {
     388            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     389            0 :                 let size = span.in_scope(|| {
     390            0 :                     t.get_current_logical_size(
     391            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     392            0 :                         ctx,
     393            0 :                     )
     394            0 :                 });
     395            0 :                 match size {
     396              :                     // Only send timeline logical size when it is fully calculated.
     397            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     398            0 :                     CurrentLogicalSize::Approximate(_) => None,
     399              :                 }
     400              :             };
     401              : 
     402            0 :             Ok(Some(TimelineSnapshot {
     403            0 :                 loaded_at,
     404            0 :                 last_record_lsn,
     405            0 :                 current_exact_logical_size,
     406            0 :             }))
     407              :         }
     408            0 :     }
     409              : 
     410              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     411           24 :     fn to_metrics(
     412           24 :         &self,
     413           24 :         tenant_id: TenantId,
     414           24 :         timeline_id: TimelineId,
     415           24 :         now: DateTime<Utc>,
     416           24 :         metrics: &mut Vec<NewRawMetric>,
     417           24 :         cache: &Cache,
     418           24 :     ) {
     419           24 :         let timeline_written_size = u64::from(self.last_record_lsn);
     420           24 : 
     421           24 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     422           24 : 
     423           24 :         let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
     424           12 :             item.kind
     425           12 :                 .incremental_timerange()
     426           12 :                 .expect("never create EventType::Absolute for written_size_delta")
     427           12 :                 .end
     428           24 :         });
     429           24 : 
     430           24 :         let written_size_now =
     431           24 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     432           24 : 
     433           24 :         // by default, use the last sent written_size as the basis for
     434           24 :         // calculating the delta. if we don't yet have one, use the load time value.
     435           24 :         let prev: (DateTime<Utc>, u64) = cache
     436           24 :             .get(&written_size_now.key)
     437           24 :             .map(|item| {
     438           16 :                 // use the prev time from our last incremental update, or default to latest
     439           16 :                 // absolute update on the first round.
     440           16 :                 let prev_at = item
     441           16 :                     .kind
     442           16 :                     .absolute_time()
     443           16 :                     .expect("never create EventType::Incremental for written_size");
     444           16 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     445           16 :                 (*prev_at, item.value)
     446           24 :             })
     447           24 :             .unwrap_or_else(|| {
     448            8 :                 // if we don't have a previous point of comparison, compare to the load time
     449            8 :                 // lsn.
     450            8 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     451            8 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     452           24 :             });
     453           24 : 
     454           24 :         let up_to = now;
     455              : 
     456           24 :         if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
     457           16 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     458           16 :             // written_size_delta
     459           16 :             metrics.push(key_value);
     460           16 :             // written_size
     461           16 :             metrics.push(written_size_now);
     462           16 :         } else {
     463            8 :             // the cached value was ahead of us, report zero until we've caught up
     464            8 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     465            8 :             // the cached value was ahead of us, report the same until we've caught up
     466            8 :             metrics.push(NewRawMetric {
     467            8 :                 key: written_size_now.key,
     468            8 :                 kind: written_size_now.kind,
     469            8 :                 value: prev.1,
     470            8 :             });
     471            8 :         }
     472              : 
     473              :         {
     474           24 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     475           24 :             let current_or_previous = self
     476           24 :                 .current_exact_logical_size
     477           24 :                 .or_else(|| cache.get(factory.key()).map(|item| item.value));
     478              : 
     479           24 :             if let Some(size) = current_or_previous {
     480           16 :                 metrics.push(, size));
     481           16 :             }
     482              :         }
     483           24 :     }
     484              : }
     485              : 
     486              : #[cfg(test)]
     487              : mod tests;
     488              : 
     489              : #[cfg(test)]
     490              : pub(crate) use tests::{metric_examples, metric_examples_old};

Generated by: LCOV version 2.1-beta