LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 67.3 % 309 208
Test Date: 2025-04-24 20:31:15 Functions: 55.8 % 43 24

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::EventType;
       6              : use futures::stream::StreamExt;
       7              : use utils::id::{TenantId, TimelineId};
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::{Cache, NewRawMetric};
      11              : use crate::context::RequestContext;
      12              : use crate::tenant::mgr::TenantManager;
      13              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      14              : 
      15              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      16              : /// instead of static str.
      17              : // Do not rename any of these without first consulting with data team and partner
      18              : // management.
      19          576 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      20              : pub(super) enum Name {
      21              :     /// Timeline last_record_lsn, absolute
      22              :     #[serde(rename = "written_size")]
      23              :     WrittenSize,
      24              :     /// Timeline last_record_lsn, incremental
      25              :     #[serde(rename = "written_data_bytes_delta")]
      26              :     WrittenSizeDelta,
      27              :     /// Timeline logical size
      28              :     #[serde(rename = "timeline_logical_size")]
      29              :     LogicalSize,
      30              :     /// Tenant remote size
      31              :     #[serde(rename = "remote_storage_size")]
      32              :     RemoteSize,
      33              :     /// Tenant resident size
      34              :     #[serde(rename = "resident_size")]
      35              :     ResidentSize,
      36              :     /// Tenant synthetic size
      37              :     #[serde(rename = "synthetic_storage_size")]
      38              :     SyntheticSize,
      39              : }
      40              : 
      41              : /// Key that uniquely identifies the object this metric describes.
      42              : ///
      43              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      44              : /// elsewhere.
      45          360 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      46              : pub(crate) struct MetricsKey {
      47              :     pub(super) tenant_id: TenantId,
      48              : 
      49              :     #[serde(skip_serializing_if = "Option::is_none")]
      50              :     pub(super) timeline_id: Option<TimelineId>,
      51              : 
      52              :     pub(super) metric: Name,
      53              : }
      54              : 
      55              : impl MetricsKey {
      56          684 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      57          684 :         AbsoluteValueFactory(self)
      58          684 :     }
      59          204 :     const fn incremental_values(self) -> IncrementalValueFactory {
      60          204 :         IncrementalValueFactory(self)
      61          204 :     }
      62              : }
      63              : 
      64              : /// Helper type which each individual metric kind can return to produce only absolute values.
      65              : struct AbsoluteValueFactory(MetricsKey);
      66              : 
      67              : impl AbsoluteValueFactory {
      68              :     #[cfg(test)]
      69           60 :     const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
      70           60 :         let key = self.0;
      71           60 :         (key, (EventType::Absolute { time }, val))
      72           60 :     }
      73              : 
      74          588 :     const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
      75          588 :         let key = self.0;
      76          588 :         NewRawMetric {
      77          588 :             key,
      78          588 :             kind: EventType::Absolute { time },
      79          588 :             value: val,
      80          588 :         }
      81          588 :     }
      82              : 
      83           60 :     fn key(&self) -> &MetricsKey {
      84           60 :         &self.0
      85           60 :     }
      86              : }
      87              : 
      88              : /// Helper type which each individual metric kind can return to produce only incremental values.
      89              : struct IncrementalValueFactory(MetricsKey);
      90              : 
      91              : impl IncrementalValueFactory {
      92              :     #[allow(clippy::wrong_self_convention)]
      93          192 :     const fn from_until(
      94          192 :         self,
      95          192 :         prev_end: DateTime<Utc>,
      96          192 :         up_to: DateTime<Utc>,
      97          192 :         val: u64,
      98          192 :     ) -> NewRawMetric {
      99          192 :         let key = self.0;
     100          192 :         // cannot assert prev_end < up_to because these are realtime clock based
     101          192 :         let when = EventType::Incremental {
     102          192 :             start_time: prev_end,
     103          192 :             stop_time: up_to,
     104          192 :         };
     105          192 :         NewRawMetric {
     106          192 :             key,
     107          192 :             kind: when,
     108          192 :             value: val,
     109          192 :         }
     110          192 :     }
     111              : 
     112              :     #[allow(clippy::wrong_self_convention)]
     113              :     #[cfg(test)]
     114           12 :     const fn from_until_old_format(
     115           12 :         self,
     116           12 :         prev_end: DateTime<Utc>,
     117           12 :         up_to: DateTime<Utc>,
     118           12 :         val: u64,
     119           12 :     ) -> super::RawMetric {
     120           12 :         let key = self.0;
     121           12 :         // cannot assert prev_end < up_to because these are realtime clock based
     122           12 :         let when = EventType::Incremental {
     123           12 :             start_time: prev_end,
     124           12 :             stop_time: up_to,
     125           12 :         };
     126           12 :         (key, (when, val))
     127           12 :     }
     128              : 
     129           72 :     fn key(&self) -> &MetricsKey {
     130           72 :         &self.0
     131           72 :     }
     132              : }
     133              : 
     134              : // the static part of a MetricsKey
     135              : impl MetricsKey {
     136              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     137              :     ///
     138              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     139          216 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     140          216 :         MetricsKey {
     141          216 :             tenant_id,
     142          216 :             timeline_id: Some(timeline_id),
     143          216 :             metric: Name::WrittenSize,
     144          216 :         }
     145          216 :         .absolute_values()
     146          216 :     }
     147              : 
     148              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     149              :     /// previously sent, starting from the previously sent incremental time range ending at the
     150              :     /// latest absolute measurement.
     151          204 :     const fn written_size_delta(
     152          204 :         tenant_id: TenantId,
     153          204 :         timeline_id: TimelineId,
     154          204 :     ) -> IncrementalValueFactory {
     155          204 :         MetricsKey {
     156          204 :             tenant_id,
     157          204 :             timeline_id: Some(timeline_id),
     158          204 :             metric: Name::WrittenSizeDelta,
     159          204 :         }
     160          204 :         .incremental_values()
     161          204 :     }
     162              : 
     163              :     /// Exact [`Timeline::get_current_logical_size`].
     164              :     ///
     165              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     166          180 :     const fn timeline_logical_size(
     167          180 :         tenant_id: TenantId,
     168          180 :         timeline_id: TimelineId,
     169          180 :     ) -> AbsoluteValueFactory {
     170          180 :         MetricsKey {
     171          180 :             tenant_id,
     172          180 :             timeline_id: Some(timeline_id),
     173          180 :             metric: Name::LogicalSize,
     174          180 :         }
     175          180 :         .absolute_values()
     176          180 :     }
     177              : 
     178              :     /// [`TenantShard::remote_size`]
     179              :     ///
     180              :     /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
     181           96 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     182           96 :         MetricsKey {
     183           96 :             tenant_id,
     184           96 :             timeline_id: None,
     185           96 :             metric: Name::RemoteSize,
     186           96 :         }
     187           96 :         .absolute_values()
     188           96 :     }
     189              : 
     190              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     191              :     ///
     192              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     193           96 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     194           96 :         MetricsKey {
     195           96 :             tenant_id,
     196           96 :             timeline_id: None,
     197           96 :             metric: Name::ResidentSize,
     198           96 :         }
     199           96 :         .absolute_values()
     200           96 :     }
     201              : 
     202              :     /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     203              :     ///
     204              :     /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
     205              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     206           96 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     207           96 :         MetricsKey {
     208           96 :             tenant_id,
     209           96 :             timeline_id: None,
     210           96 :             metric: Name::SyntheticSize,
     211           96 :         }
     212           96 :         .absolute_values()
     213           96 :     }
     214              : }
     215              : 
     216            0 : pub(super) async fn collect_all_metrics(
     217            0 :     tenant_manager: &Arc<TenantManager>,
     218            0 :     cached_metrics: &Cache,
     219            0 :     ctx: &RequestContext,
     220            0 : ) -> Vec<NewRawMetric> {
     221              :     use pageserver_api::models::TenantState;
     222              : 
     223            0 :     let started_at = std::time::Instant::now();
     224              : 
     225            0 :     let tenants = match tenant_manager.list_tenants() {
     226            0 :         Ok(tenants) => tenants,
     227            0 :         Err(err) => {
     228            0 :             tracing::error!("failed to list tenants: {:?}", err);
     229            0 :             return vec![];
     230              :         }
     231              :     };
     232              : 
     233            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     234            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     235            0 :             None
     236              :         } else {
     237            0 :             tenant_manager
     238            0 :                 .get_attached_tenant_shard(id)
     239            0 :                 .ok()
     240            0 :                 .map(|tenant| (id.tenant_id, tenant))
     241              :         }
     242            0 :     });
     243              : 
     244            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     245              : 
     246            0 :     tracing::info!(
     247            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     248            0 :         total = res.len(),
     249            0 :         "collected metrics"
     250              :     );
     251              : 
     252            0 :     res
     253            0 : }
     254              : 
     255            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
     256            0 : where
     257            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
     258            0 : {
     259            0 :     let mut current_metrics: Vec<NewRawMetric> = Vec::new();
     260            0 : 
     261            0 :     let mut tenants = std::pin::pin!(tenants);
     262              : 
     263            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     264            0 :         let mut tenant_resident_size = 0;
     265            0 : 
     266            0 :         let timelines = tenant.list_timelines();
     267            0 :         let timelines_len = timelines.len();
     268            0 :         for timeline in timelines {
     269            0 :             let timeline_id = timeline.timeline_id;
     270            0 : 
     271            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     272            0 :                 Ok(Some(snap)) => {
     273            0 :                     snap.to_metrics(
     274            0 :                         tenant_id,
     275            0 :                         timeline_id,
     276            0 :                         Utc::now(),
     277            0 :                         &mut current_metrics,
     278            0 :                         cache,
     279            0 :                     );
     280            0 :                 }
     281            0 :                 Ok(None) => {}
     282            0 :                 Err(e) => {
     283            0 :                     tracing::error!(
     284            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     285            0 :                         timeline.timeline_id
     286              :                     );
     287            0 :                     continue;
     288              :                 }
     289              :             }
     290              : 
     291            0 :             tenant_resident_size += timeline.resident_physical_size();
     292              :         }
     293              : 
     294            0 :         if timelines_len == 0 {
     295            0 :             // Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
     296            0 :             tenant_resident_size = 1;
     297            0 :         }
     298              : 
     299            0 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     300            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     301              :     }
     302              : 
     303            0 :     current_metrics
     304            0 : }
     305              : 
     306              : /// In-between abstraction to allow testing metrics without actual Tenants.
     307              : struct TenantSnapshot {
     308              :     resident_size: u64,
     309              :     remote_size: u64,
     310              :     synthetic_size: u64,
     311              : }
     312              : 
     313              : impl TenantSnapshot {
     314              :     /// Collect tenant status to have metrics created out of it.
     315              :     ///
     316              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     317              :     /// cannot just list timelines here.
     318            0 :     fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
     319            0 :         TenantSnapshot {
     320            0 :             resident_size,
     321            0 :             remote_size: t.remote_size(),
     322            0 :             // Note that this metric is calculated in a separate bgworker
     323            0 :             // Here we only use cached value, which may lag behind the real latest one
     324            0 :             synthetic_size: t.cached_synthetic_size(),
     325            0 :         }
     326            0 :     }
     327              : 
     328           24 :     fn to_metrics(
     329           24 :         &self,
     330           24 :         tenant_id: TenantId,
     331           24 :         now: DateTime<Utc>,
     332           24 :         cached: &Cache,
     333           24 :         metrics: &mut Vec<NewRawMetric>,
     334           24 :     ) {
     335           24 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     336           24 : 
     337           24 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     338              : 
     339           24 :         let synthetic_size = {
     340           24 :             let factory = MetricsKey::synthetic_size(tenant_id);
     341           24 :             let mut synthetic_size = self.synthetic_size;
     342           24 : 
     343           24 :             if synthetic_size == 0 {
     344           24 :                 if let Some(item) = cached.get(factory.key()) {
     345           12 :                     // use the latest value from previous session, TODO: check generation number
     346           12 :                     synthetic_size = item.value;
     347           12 :                 }
     348            0 :             }
     349              : 
     350           24 :             if synthetic_size != 0 {
     351              :                 // only send non-zeroes because otherwise these show up as errors in logs
     352           12 :                 Some(factory.at(now, synthetic_size))
     353              :             } else {
     354           12 :                 None
     355              :             }
     356              :         };
     357              : 
     358           24 :         metrics.extend(
     359           24 :             [Some(remote_size), Some(resident_size), synthetic_size]
     360           24 :                 .into_iter()
     361           24 :                 .flatten(),
     362           24 :         );
     363           24 :     }
     364              : }
     365              : 
     366              : /// Internal type to make timeline metric production testable.
     367              : ///
     368              : /// As this value type contains all of the information needed from a timeline to produce the
     369              : /// metrics, it can easily be created with different values in test.
     370              : struct TimelineSnapshot {
     371              :     loaded_at: (Lsn, SystemTime),
     372              :     last_record_lsn: Lsn,
     373              :     current_exact_logical_size: Option<u64>,
     374              : }
     375              : 
     376              : impl TimelineSnapshot {
     377              :     /// Collect the metrics from an actual timeline.
     378              :     ///
     379              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     380              :     ///
     381              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     382            0 :     fn collect(
     383            0 :         t: &Arc<crate::tenant::Timeline>,
     384            0 :         ctx: &RequestContext,
     385            0 :     ) -> anyhow::Result<Option<Self>> {
     386            0 :         if !t.is_active() {
     387              :             // no collection for broken or stopping needed, we will still keep the cached values
     388              :             // though at the caller.
     389            0 :             Ok(None)
     390              :         } else {
     391            0 :             let loaded_at = t.loaded_at;
     392            0 :             let last_record_lsn = t.get_last_record_lsn();
     393              : 
     394            0 :             let current_exact_logical_size = {
     395            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     396            0 :                 let size = span.in_scope(|| {
     397            0 :                     t.get_current_logical_size(
     398            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     399            0 :                         ctx,
     400            0 :                     )
     401            0 :                 });
     402            0 :                 match size {
     403              :                     // Only send timeline logical size when it is fully calculated.
     404            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     405            0 :                     CurrentLogicalSize::Approximate(_) => None,
     406              :                 }
     407              :             };
     408              : 
     409            0 :             Ok(Some(TimelineSnapshot {
     410            0 :                 loaded_at,
     411            0 :                 last_record_lsn,
     412            0 :                 current_exact_logical_size,
     413            0 :             }))
     414              :         }
     415            0 :     }
     416              : 
     417              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     418           72 :     fn to_metrics(
     419           72 :         &self,
     420           72 :         tenant_id: TenantId,
     421           72 :         timeline_id: TimelineId,
     422           72 :         now: DateTime<Utc>,
     423           72 :         metrics: &mut Vec<NewRawMetric>,
     424           72 :         cache: &Cache,
     425           72 :     ) {
     426           72 :         let timeline_written_size = u64::from(self.last_record_lsn);
     427           72 : 
     428           72 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     429           72 : 
     430           72 :         let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
     431           36 :             item.kind
     432           36 :                 .incremental_timerange()
     433           36 :                 .expect("never create EventType::Absolute for written_size_delta")
     434           36 :                 .end
     435           72 :         });
     436           72 : 
     437           72 :         let written_size_now =
     438           72 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     439           72 : 
     440           72 :         // by default, use the last sent written_size as the basis for
     441           72 :         // calculating the delta. if we don't yet have one, use the load time value.
     442           72 :         let prev: (DateTime<Utc>, u64) = cache
     443           72 :             .get(&written_size_now.key)
     444           72 :             .map(|item| {
     445           48 :                 // use the prev time from our last incremental update, or default to latest
     446           48 :                 // absolute update on the first round.
     447           48 :                 let prev_at = item
     448           48 :                     .kind
     449           48 :                     .absolute_time()
     450           48 :                     .expect("never create EventType::Incremental for written_size");
     451           48 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     452           48 :                 (*prev_at, item.value)
     453           72 :             })
     454           72 :             .unwrap_or_else(|| {
     455           24 :                 // if we don't have a previous point of comparison, compare to the load time
     456           24 :                 // lsn.
     457           24 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     458           24 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     459           72 :             });
     460           72 : 
     461           72 :         let up_to = now;
     462              : 
     463           72 :         if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
     464           48 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     465           48 :             // written_size_delta
     466           48 :             metrics.push(key_value);
     467           48 :             // written_size
     468           48 :             metrics.push(written_size_now);
     469           48 :         } else {
     470           24 :             // the cached value was ahead of us, report zero until we've caught up
     471           24 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     472           24 :             // the cached value was ahead of us, report the same until we've caught up
     473           24 :             metrics.push(NewRawMetric {
     474           24 :                 key: written_size_now.key,
     475           24 :                 kind: written_size_now.kind,
     476           24 :                 value: prev.1,
     477           24 :             });
     478           24 :         }
     479              : 
     480              :         {
     481           72 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     482           72 :             let current_or_previous = self
     483           72 :                 .current_exact_logical_size
     484           72 :                 .or_else(|| cache.get(factory.key()).map(|item| item.value));
     485              : 
     486           72 :             if let Some(size) = current_or_previous {
     487           48 :                 metrics.push(factory.at(now, size));
     488           48 :             }
     489              :         }
     490           72 :     }
     491              : }
     492              : 
     493              : #[cfg(test)]
     494              : mod tests;
     495              : 
     496              : #[cfg(test)]
     497              : pub(crate) use tests::{metric_examples, metric_examples_old};
        

Generated by: LCOV version 2.1-beta