LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 68.9 % 302 208
Test Date: 2025-01-30 15:18:43 Functions: 53.3 % 45 24

            Line data    Source code
       1              : use crate::tenant::mgr::TenantManager;
       2              : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
       3              : use chrono::{DateTime, Utc};
       4              : use consumption_metrics::EventType;
       5              : use futures::stream::StreamExt;
       6              : use std::{sync::Arc, time::SystemTime};
       7              : use utils::{
       8              :     id::{TenantId, TimelineId},
       9              :     lsn::Lsn,
      10              : };
      11              : 
      12              : use super::{Cache, NewRawMetric};
      13              : 
      14              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      15              : /// instead of static str.
      16              : // Do not rename any of these without first consulting with data team and partner
      17              : // management.
      18          192 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      19              : pub(super) enum Name {
      20              :     /// Timeline last_record_lsn, absolute
      21              :     #[serde(rename = "written_size")]
      22              :     WrittenSize,
      23              :     /// Timeline last_record_lsn, incremental
      24              :     #[serde(rename = "written_data_bytes_delta")]
      25              :     WrittenSizeDelta,
      26              :     /// Timeline logical size
      27              :     #[serde(rename = "timeline_logical_size")]
      28              :     LogicalSize,
      29              :     /// Tenant remote size
      30              :     #[serde(rename = "remote_storage_size")]
      31              :     RemoteSize,
      32              :     /// Tenant resident size
      33              :     #[serde(rename = "resident_size")]
      34              :     ResidentSize,
      35              :     /// Tenant synthetic size
      36              :     #[serde(rename = "synthetic_storage_size")]
      37              :     SyntheticSize,
      38              : }
      39              : 
      40              : /// Key that uniquely identifies the object this metric describes.
      41              : ///
      42              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      43              : /// elsewhere.
      44          120 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      45              : pub(crate) struct MetricsKey {
      46              :     pub(super) tenant_id: TenantId,
      47              : 
      48              :     #[serde(skip_serializing_if = "Option::is_none")]
      49              :     pub(super) timeline_id: Option<TimelineId>,
      50              : 
      51              :     pub(super) metric: Name,
      52              : }
      53              : 
      54              : impl MetricsKey {
      55          228 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      56          228 :         AbsoluteValueFactory(self)
      57          228 :     }
      58           68 :     const fn incremental_values(self) -> IncrementalValueFactory {
      59           68 :         IncrementalValueFactory(self)
      60           68 :     }
      61              : }
      62              : 
      63              : /// Helper type which each individual metric kind can return to produce only absolute values.
      64              : struct AbsoluteValueFactory(MetricsKey);
      65              : 
      66              : impl AbsoluteValueFactory {
      67              :     #[cfg(test)]
      68           20 :     const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
      69           20 :         let key = self.0;
      70           20 :         (key, (EventType::Absolute { time }, val))
      71           20 :     }
      72              : 
      73          196 :     const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
      74          196 :         let key = self.0;
      75          196 :         NewRawMetric {
      76          196 :             key,
      77          196 :             kind: EventType::Absolute { time },
      78          196 :             value: val,
      79          196 :         }
      80          196 :     }
      81              : 
      82           20 :     fn key(&self) -> &MetricsKey {
      83           20 :         &self.0
      84           20 :     }
      85              : }
      86              : 
      87              : /// Helper type which each individual metric kind can return to produce only incremental values.
      88              : struct IncrementalValueFactory(MetricsKey);
      89              : 
      90              : impl IncrementalValueFactory {
      91              :     #[allow(clippy::wrong_self_convention)]
      92           64 :     const fn from_until(
      93           64 :         self,
      94           64 :         prev_end: DateTime<Utc>,
      95           64 :         up_to: DateTime<Utc>,
      96           64 :         val: u64,
      97           64 :     ) -> NewRawMetric {
      98           64 :         let key = self.0;
      99           64 :         // cannot assert prev_end < up_to because these are realtime clock based
     100           64 :         let when = EventType::Incremental {
     101           64 :             start_time: prev_end,
     102           64 :             stop_time: up_to,
     103           64 :         };
     104           64 :         NewRawMetric {
     105           64 :             key,
     106           64 :             kind: when,
     107           64 :             value: val,
     108           64 :         }
     109           64 :     }
     110              : 
     111              :     #[allow(clippy::wrong_self_convention)]
     112              :     #[cfg(test)]
     113            4 :     const fn from_until_old_format(
     114            4 :         self,
     115            4 :         prev_end: DateTime<Utc>,
     116            4 :         up_to: DateTime<Utc>,
     117            4 :         val: u64,
     118            4 :     ) -> super::RawMetric {
     119            4 :         let key = self.0;
     120            4 :         // cannot assert prev_end < up_to because these are realtime clock based
     121            4 :         let when = EventType::Incremental {
     122            4 :             start_time: prev_end,
     123            4 :             stop_time: up_to,
     124            4 :         };
     125            4 :         (key, (when, val))
     126            4 :     }
     127              : 
     128           24 :     fn key(&self) -> &MetricsKey {
     129           24 :         &self.0
     130           24 :     }
     131              : }
     132              : 
     133              : // the static part of a MetricsKey
     134              : impl MetricsKey {
     135              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     136              :     ///
     137              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     138           72 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     139           72 :         MetricsKey {
     140           72 :             tenant_id,
     141           72 :             timeline_id: Some(timeline_id),
     142           72 :             metric: Name::WrittenSize,
     143           72 :         }
     144           72 :         .absolute_values()
     145           72 :     }
     146              : 
     147              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     148              :     /// previously sent, starting from the previously sent incremental time range ending at the
     149              :     /// latest absolute measurement.
     150           68 :     const fn written_size_delta(
     151           68 :         tenant_id: TenantId,
     152           68 :         timeline_id: TimelineId,
     153           68 :     ) -> IncrementalValueFactory {
     154           68 :         MetricsKey {
     155           68 :             tenant_id,
     156           68 :             timeline_id: Some(timeline_id),
     157           68 :             metric: Name::WrittenSizeDelta,
     158           68 :         }
     159           68 :         .incremental_values()
     160           68 :     }
     161              : 
     162              :     /// Exact [`Timeline::get_current_logical_size`].
     163              :     ///
     164              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     165           60 :     const fn timeline_logical_size(
     166           60 :         tenant_id: TenantId,
     167           60 :         timeline_id: TimelineId,
     168           60 :     ) -> AbsoluteValueFactory {
     169           60 :         MetricsKey {
     170           60 :             tenant_id,
     171           60 :             timeline_id: Some(timeline_id),
     172           60 :             metric: Name::LogicalSize,
     173           60 :         }
     174           60 :         .absolute_values()
     175           60 :     }
     176              : 
     177              :     /// [`Tenant::remote_size`]
     178              :     ///
     179              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     180           32 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     181           32 :         MetricsKey {
     182           32 :             tenant_id,
     183           32 :             timeline_id: None,
     184           32 :             metric: Name::RemoteSize,
     185           32 :         }
     186           32 :         .absolute_values()
     187           32 :     }
     188              : 
     189              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     190              :     ///
     191              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     192           32 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     193           32 :         MetricsKey {
     194           32 :             tenant_id,
     195           32 :             timeline_id: None,
     196           32 :             metric: Name::ResidentSize,
     197           32 :         }
     198           32 :         .absolute_values()
     199           32 :     }
     200              : 
     201              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     202              :     ///
     203              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     204              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     205           32 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     206           32 :         MetricsKey {
     207           32 :             tenant_id,
     208           32 :             timeline_id: None,
     209           32 :             metric: Name::SyntheticSize,
     210           32 :         }
     211           32 :         .absolute_values()
     212           32 :     }
     213              : }
     214              : 
     215            0 : pub(super) async fn collect_all_metrics(
     216            0 :     tenant_manager: &Arc<TenantManager>,
     217            0 :     cached_metrics: &Cache,
     218            0 :     ctx: &RequestContext,
     219            0 : ) -> Vec<NewRawMetric> {
     220              :     use pageserver_api::models::TenantState;
     221              : 
     222            0 :     let started_at = std::time::Instant::now();
     223              : 
     224            0 :     let tenants = match tenant_manager.list_tenants() {
     225            0 :         Ok(tenants) => tenants,
     226            0 :         Err(err) => {
     227            0 :             tracing::error!("failed to list tenants: {:?}", err);
     228            0 :             return vec![];
     229              :         }
     230              :     };
     231              : 
     232            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     233            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     234            0 :             None
     235              :         } else {
     236            0 :             tenant_manager
     237            0 :                 .get_attached_tenant_shard(id)
     238            0 :                 .ok()
     239            0 :                 .map(|tenant| (id.tenant_id, tenant))
     240              :         }
     241            0 :     });
     242              : 
     243            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     244              : 
     245            0 :     tracing::info!(
     246            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     247            0 :         total = res.len(),
     248            0 :         "collected metrics"
     249              :     );
     250              : 
     251            0 :     res
     252            0 : }
     253              : 
     254            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
     255            0 : where
     256            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     257            0 : {
     258            0 :     let mut current_metrics: Vec<NewRawMetric> = Vec::new();
     259            0 : 
     260            0 :     let mut tenants = std::pin::pin!(tenants);
     261              : 
     262            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     263            0 :         let mut tenant_resident_size = 0;
     264              : 
     265            0 :         for timeline in tenant.list_timelines() {
     266            0 :             let timeline_id = timeline.timeline_id;
     267            0 : 
     268            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     269            0 :                 Ok(Some(snap)) => {
     270            0 :                     snap.to_metrics(
     271            0 :                         tenant_id,
     272            0 :                         timeline_id,
     273            0 :                         Utc::now(),
     274            0 :                         &mut current_metrics,
     275            0 :                         cache,
     276            0 :                     );
     277            0 :                 }
     278            0 :                 Ok(None) => {}
     279            0 :                 Err(e) => {
     280            0 :                     tracing::error!(
     281            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     282            0 :                         timeline.timeline_id
     283              :                     );
     284            0 :                     continue;
     285              :                 }
     286              :             }
     287              : 
     288            0 :             tenant_resident_size += timeline.resident_physical_size();
     289              :         }
     290              : 
     291            0 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     292            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     293              :     }
     294              : 
     295            0 :     current_metrics
     296            0 : }
     297              : 
     298              : /// In-between abstraction to allow testing metrics without actual Tenants.
     299              : struct TenantSnapshot {
     300              :     resident_size: u64,
     301              :     remote_size: u64,
     302              :     synthetic_size: u64,
     303              : }
     304              : 
     305              : impl TenantSnapshot {
     306              :     /// Collect tenant status to have metrics created out of it.
     307              :     ///
     308              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     309              :     /// cannot just list timelines here.
     310            0 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     311            0 :         TenantSnapshot {
     312            0 :             resident_size,
     313            0 :             remote_size: t.remote_size(),
     314            0 :             // Note that this metric is calculated in a separate bgworker
     315            0 :             // Here we only use cached value, which may lag behind the real latest one
     316            0 :             synthetic_size: t.cached_synthetic_size(),
     317            0 :         }
     318            0 :     }
     319              : 
     320            8 :     fn to_metrics(
     321            8 :         &self,
     322            8 :         tenant_id: TenantId,
     323            8 :         now: DateTime<Utc>,
     324            8 :         cached: &Cache,
     325            8 :         metrics: &mut Vec<NewRawMetric>,
     326            8 :     ) {
     327            8 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     328            8 : 
     329            8 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     330              : 
     331            8 :         let synthetic_size = {
     332            8 :             let factory = MetricsKey::synthetic_size(tenant_id);
     333            8 :             let mut synthetic_size = self.synthetic_size;
     334            8 : 
     335            8 :             if synthetic_size == 0 {
     336            8 :                 if let Some(item) = cached.get(factory.key()) {
     337            4 :                     // use the latest value from previous session, TODO: check generation number
     338            4 :                     synthetic_size = item.value;
     339            4 :                 }
     340            0 :             }
     341              : 
     342            8 :             if synthetic_size != 0 {
     343              :                 // only send non-zeroes because otherwise these show up as errors in logs
     344            4 :                 Some(factory.at(now, synthetic_size))
     345              :             } else {
     346            4 :                 None
     347              :             }
     348              :         };
     349              : 
     350            8 :         metrics.extend(
     351            8 :             [Some(remote_size), Some(resident_size), synthetic_size]
     352            8 :                 .into_iter()
     353            8 :                 .flatten(),
     354            8 :         );
     355            8 :     }
     356              : }
     357              : 
     358              : /// Internal type to make timeline metric production testable.
     359              : ///
     360              : /// As this value type contains all of the information needed from a timeline to produce the
     361              : /// metrics, it can easily be created with different values in test.
     362              : struct TimelineSnapshot {
     363              :     loaded_at: (Lsn, SystemTime),
     364              :     last_record_lsn: Lsn,
     365              :     current_exact_logical_size: Option<u64>,
     366              : }
     367              : 
     368              : impl TimelineSnapshot {
     369              :     /// Collect the metrics from an actual timeline.
     370              :     ///
     371              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     372              :     ///
     373              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     374            0 :     fn collect(
     375            0 :         t: &Arc<crate::tenant::Timeline>,
     376            0 :         ctx: &RequestContext,
     377            0 :     ) -> anyhow::Result<Option<Self>> {
     378            0 :         if !t.is_active() {
     379              :             // no collection for broken or stopping needed, we will still keep the cached values
     380              :             // though at the caller.
     381            0 :             Ok(None)
     382              :         } else {
     383            0 :             let loaded_at = t.loaded_at;
     384            0 :             let last_record_lsn = t.get_last_record_lsn();
     385              : 
     386            0 :             let current_exact_logical_size = {
     387            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     388            0 :                 let size = span.in_scope(|| {
     389            0 :                     t.get_current_logical_size(
     390            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     391            0 :                         ctx,
     392            0 :                     )
     393            0 :                 });
     394            0 :                 match size {
     395              :                     // Only send timeline logical size when it is fully calculated.
     396            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     397            0 :                     CurrentLogicalSize::Approximate(_) => None,
     398              :                 }
     399              :             };
     400              : 
     401            0 :             Ok(Some(TimelineSnapshot {
     402            0 :                 loaded_at,
     403            0 :                 last_record_lsn,
     404            0 :                 current_exact_logical_size,
     405            0 :             }))
     406              :         }
     407            0 :     }
     408              : 
     409              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     410           24 :     fn to_metrics(
     411           24 :         &self,
     412           24 :         tenant_id: TenantId,
     413           24 :         timeline_id: TimelineId,
     414           24 :         now: DateTime<Utc>,
     415           24 :         metrics: &mut Vec<NewRawMetric>,
     416           24 :         cache: &Cache,
     417           24 :     ) {
     418           24 :         let timeline_written_size = u64::from(self.last_record_lsn);
     419           24 : 
     420           24 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     421           24 : 
     422           24 :         let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
     423           12 :             item.kind
     424           12 :                 .incremental_timerange()
     425           12 :                 .expect("never create EventType::Absolute for written_size_delta")
     426           12 :                 .end
     427           24 :         });
     428           24 : 
     429           24 :         let written_size_now =
     430           24 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     431           24 : 
     432           24 :         // by default, use the last sent written_size as the basis for
     433           24 :         // calculating the delta. if we don't yet have one, use the load time value.
     434           24 :         let prev: (DateTime<Utc>, u64) = cache
     435           24 :             .get(&written_size_now.key)
     436           24 :             .map(|item| {
     437           16 :                 // use the prev time from our last incremental update, or default to latest
     438           16 :                 // absolute update on the first round.
     439           16 :                 let prev_at = item
     440           16 :                     .kind
     441           16 :                     .absolute_time()
     442           16 :                     .expect("never create EventType::Incremental for written_size");
     443           16 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     444           16 :                 (*prev_at, item.value)
     445           24 :             })
     446           24 :             .unwrap_or_else(|| {
     447            8 :                 // if we don't have a previous point of comparison, compare to the load time
     448            8 :                 // lsn.
     449            8 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     450            8 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     451           24 :             });
     452           24 : 
     453           24 :         let up_to = now;
     454              : 
     455           24 :         if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
     456           16 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     457           16 :             // written_size_delta
     458           16 :             metrics.push(key_value);
     459           16 :             // written_size
     460           16 :             metrics.push(written_size_now);
     461           16 :         } else {
     462            8 :             // the cached value was ahead of us, report zero until we've caught up
     463            8 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     464            8 :             // the cached value was ahead of us, report the same until we've caught up
     465            8 :             metrics.push(NewRawMetric {
     466            8 :                 key: written_size_now.key,
     467            8 :                 kind: written_size_now.kind,
     468            8 :                 value: prev.1,
     469            8 :             });
     470            8 :         }
     471              : 
     472              :         {
     473           24 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     474           24 :             let current_or_previous = self
     475           24 :                 .current_exact_logical_size
     476           24 :                 .or_else(|| cache.get(factory.key()).map(|item| item.value));
     477              : 
     478           24 :             if let Some(size) = current_or_previous {
     479           16 :                 metrics.push(factory.at(now, size));
     480           16 :             }
     481              :         }
     482           24 :     }
     483              : }
     484              : 
     485              : #[cfg(test)]
     486              : mod tests;
     487              : 
     488              : #[cfg(test)]
     489              : pub(crate) use tests::{metric_examples, metric_examples_old};
        

Generated by: LCOV version 2.1-beta