LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 64.3 % 277 178
Test Date: 2024-06-24 16:38:41 Functions: 38.6 % 57 22

            Line data    Source code
       1              : use crate::tenant::mgr::TenantManager;
       2              : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
       3              : use chrono::{DateTime, Utc};
       4              : use consumption_metrics::EventType;
       5              : use futures::stream::StreamExt;
       6              : use std::{sync::Arc, time::SystemTime};
       7              : use utils::{
       8              :     id::{TenantId, TimelineId},
       9              :     lsn::Lsn,
      10              : };
      11              : 
      12              : use super::{Cache, RawMetric};
      13              : 
      14              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      15              : /// instead of static str.
      16              : // Do not rename any of these without first consulting with data team and partner
      17              : // management.
      18          144 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      19              : pub(super) enum Name {
      20              :     /// Timeline last_record_lsn, absolute
      21              :     #[serde(rename = "written_size")]
      22              :     WrittenSize,
      23              :     /// Timeline last_record_lsn, incremental
      24              :     #[serde(rename = "written_data_bytes_delta")]
      25              :     WrittenSizeDelta,
      26              :     /// Timeline logical size
      27              :     #[serde(rename = "timeline_logical_size")]
      28              :     LogicalSize,
      29              :     /// Tenant remote size
      30              :     #[serde(rename = "remote_storage_size")]
      31              :     RemoteSize,
      32              :     /// Tenant resident size
      33              :     #[serde(rename = "resident_size")]
      34              :     ResidentSize,
      35              :     /// Tenant synthetic size
      36              :     #[serde(rename = "synthetic_storage_size")]
      37              :     SyntheticSize,
      38              : }
      39              : 
      40              : /// Key that uniquely identifies the object this metric describes.
      41              : ///
      42              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      43              : /// elsewhere.
      44            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      45              : pub(crate) struct MetricsKey {
      46              :     pub(super) tenant_id: TenantId,
      47              : 
      48              :     #[serde(skip_serializing_if = "Option::is_none")]
      49              :     pub(super) timeline_id: Option<TimelineId>,
      50              : 
      51              :     pub(super) metric: Name,
      52              : }
      53              : 
      54              : impl MetricsKey {
      55           94 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      56           94 :         AbsoluteValueFactory(self)
      57           94 :     }
      58           30 :     const fn incremental_values(self) -> IncrementalValueFactory {
      59           30 :         IncrementalValueFactory(self)
      60           30 :     }
      61              : }
      62              : 
      63              : /// Helper type which each individual metric kind can return to produce only absolute values.
      64              : struct AbsoluteValueFactory(MetricsKey);
      65              : 
      66              : impl AbsoluteValueFactory {
      67           88 :     const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
      68           88 :         let key = self.0;
      69           88 :         (key, (EventType::Absolute { time }, val))
      70           88 :     }
      71              : 
      72           10 :     fn key(&self) -> &MetricsKey {
      73           10 :         &self.0
      74           10 :     }
      75              : }
      76              : 
      77              : /// Helper type which each individual metric kind can return to produce only incremental values.
      78              : struct IncrementalValueFactory(MetricsKey);
      79              : 
      80              : impl IncrementalValueFactory {
      81              :     #[allow(clippy::wrong_self_convention)]
      82           30 :     const fn from_until(
      83           30 :         self,
      84           30 :         prev_end: DateTime<Utc>,
      85           30 :         up_to: DateTime<Utc>,
      86           30 :         val: u64,
      87           30 :     ) -> RawMetric {
      88           30 :         let key = self.0;
      89           30 :         // cannot assert prev_end < up_to because these are realtime clock based
      90           30 :         let when = EventType::Incremental {
      91           30 :             start_time: prev_end,
      92           30 :             stop_time: up_to,
      93           30 :         };
      94           30 :         (key, (when, val))
      95           30 :     }
      96              : 
      97           12 :     fn key(&self) -> &MetricsKey {
      98           12 :         &self.0
      99           12 :     }
     100              : }
     101              : 
     102              : // the static part of a MetricsKey
     103              : impl MetricsKey {
     104              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     105              :     ///
     106              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     107           32 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     108           32 :         MetricsKey {
     109           32 :             tenant_id,
     110           32 :             timeline_id: Some(timeline_id),
     111           32 :             metric: Name::WrittenSize,
     112           32 :         }
     113           32 :         .absolute_values()
     114           32 :     }
     115              : 
     116              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     117              :     /// previously sent, starting from the previously sent incremental time range ending at the
     118              :     /// latest absolute measurement.
     119           30 :     const fn written_size_delta(
     120           30 :         tenant_id: TenantId,
     121           30 :         timeline_id: TimelineId,
     122           30 :     ) -> IncrementalValueFactory {
     123           30 :         MetricsKey {
     124           30 :             tenant_id,
     125           30 :             timeline_id: Some(timeline_id),
     126           30 :             metric: Name::WrittenSizeDelta,
     127           30 :         }
     128           30 :         .incremental_values()
     129           30 :     }
     130              : 
     131              :     /// Exact [`Timeline::get_current_logical_size`].
     132              :     ///
     133              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     134           26 :     const fn timeline_logical_size(
     135           26 :         tenant_id: TenantId,
     136           26 :         timeline_id: TimelineId,
     137           26 :     ) -> AbsoluteValueFactory {
     138           26 :         MetricsKey {
     139           26 :             tenant_id,
     140           26 :             timeline_id: Some(timeline_id),
     141           26 :             metric: Name::LogicalSize,
     142           26 :         }
     143           26 :         .absolute_values()
     144           26 :     }
     145              : 
     146              :     /// [`Tenant::remote_size`]
     147              :     ///
     148              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     149           12 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     150           12 :         MetricsKey {
     151           12 :             tenant_id,
     152           12 :             timeline_id: None,
     153           12 :             metric: Name::RemoteSize,
     154           12 :         }
     155           12 :         .absolute_values()
     156           12 :     }
     157              : 
     158              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     159              :     ///
     160              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     161           12 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     162           12 :         MetricsKey {
     163           12 :             tenant_id,
     164           12 :             timeline_id: None,
     165           12 :             metric: Name::ResidentSize,
     166           12 :         }
     167           12 :         .absolute_values()
     168           12 :     }
     169              : 
     170              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     171              :     ///
     172              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     173              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     174           12 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     175           12 :         MetricsKey {
     176           12 :             tenant_id,
     177           12 :             timeline_id: None,
     178           12 :             metric: Name::SyntheticSize,
     179           12 :         }
     180           12 :         .absolute_values()
     181           12 :     }
     182              : }
     183              : 
     184            0 : pub(super) async fn collect_all_metrics(
     185            0 :     tenant_manager: &Arc<TenantManager>,
     186            0 :     cached_metrics: &Cache,
     187            0 :     ctx: &RequestContext,
     188            0 : ) -> Vec<RawMetric> {
     189            0 :     use pageserver_api::models::TenantState;
     190            0 : 
     191            0 :     let started_at = std::time::Instant::now();
     192              : 
     193            0 :     let tenants = match tenant_manager.list_tenants() {
     194            0 :         Ok(tenants) => tenants,
     195            0 :         Err(err) => {
     196            0 :             tracing::error!("failed to list tenants: {:?}", err);
     197            0 :             return vec![];
     198              :         }
     199              :     };
     200              : 
     201            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     202            0 :         if state != TenantState::Active || !id.is_shard_zero() {
     203            0 :             None
     204            0 :         } else {
     205            0 :             tenant_manager
     206            0 :                 .get_attached_tenant_shard(id)
     207            0 :                 .ok()
     208            0 :                 .map(|tenant| (id.tenant_id, tenant))
     209            0 :         }
     210            0 :     });
     211              : 
     212            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     213              : 
     214            0 :     tracing::info!(
     215            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     216            0 :         total = res.len(),
     217            0 :         "collected metrics"
     218              :     );
     219              : 
     220            0 :     res
     221            0 : }
     222              : 
     223            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
     224            0 : where
     225            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     226            0 : {
     227            0 :     let mut current_metrics: Vec<RawMetric> = Vec::new();
     228            0 : 
     229            0 :     let mut tenants = std::pin::pin!(tenants);
     230              : 
     231            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     232            0 :         let mut tenant_resident_size = 0;
     233              : 
     234            0 :         for timeline in tenant.list_timelines() {
     235            0 :             let timeline_id = timeline.timeline_id;
     236            0 : 
     237            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     238            0 :                 Ok(Some(snap)) => {
     239            0 :                     snap.to_metrics(
     240            0 :                         tenant_id,
     241            0 :                         timeline_id,
     242            0 :                         Utc::now(),
     243            0 :                         &mut current_metrics,
     244            0 :                         cache,
     245            0 :                     );
     246            0 :                 }
     247            0 :                 Ok(None) => {}
     248            0 :                 Err(e) => {
     249            0 :                     tracing::error!(
     250            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     251            0 :                         timeline.timeline_id
     252              :                     );
     253            0 :                     continue;
     254              :                 }
     255              :             }
     256              : 
     257            0 :             tenant_resident_size += timeline.resident_physical_size();
     258              :         }
     259              : 
     260            0 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     261            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     262              :     }
     263              : 
     264            0 :     current_metrics
     265            0 : }
     266              : 
     267              : /// In-between abstraction to allow testing metrics without actual Tenants.
     268              : struct TenantSnapshot {
     269              :     resident_size: u64,
     270              :     remote_size: u64,
     271              :     synthetic_size: u64,
     272              : }
     273              : 
     274              : impl TenantSnapshot {
     275              :     /// Collect tenant status to have metrics created out of it.
     276              :     ///
     277              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     278              :     /// cannot just list timelines here.
     279            0 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     280            0 :         TenantSnapshot {
     281            0 :             resident_size,
     282            0 :             remote_size: t.remote_size(),
     283            0 :             // Note that this metric is calculated in a separate bgworker
     284            0 :             // Here we only use cached value, which may lag behind the real latest one
     285            0 :             synthetic_size: t.cached_synthetic_size(),
     286            0 :         }
     287            0 :     }
     288              : 
     289            4 :     fn to_metrics(
     290            4 :         &self,
     291            4 :         tenant_id: TenantId,
     292            4 :         now: DateTime<Utc>,
     293            4 :         cached: &Cache,
     294            4 :         metrics: &mut Vec<RawMetric>,
     295            4 :     ) {
     296            4 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     297            4 : 
     298            4 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     299              : 
     300            4 :         let synthetic_size = {
     301            4 :             let factory = MetricsKey::synthetic_size(tenant_id);
     302            4 :             let mut synthetic_size = self.synthetic_size;
     303            4 : 
     304            4 :             if synthetic_size == 0 {
     305            4 :                 if let Some((_, value)) = cached.get(factory.key()) {
     306            2 :                     // use the latest value from previous session
     307            2 :                     synthetic_size = *value;
     308            2 :                 }
     309            0 :             }
     310              : 
     311            4 :             if synthetic_size != 0 {
     312              :                 // only send non-zeroes because otherwise these show up as errors in logs
     313            2 :                 Some(factory.at(now, synthetic_size))
     314              :             } else {
     315            2 :                 None
     316              :             }
     317              :         };
     318              : 
     319            4 :         metrics.extend(
     320            4 :             [Some(remote_size), Some(resident_size), synthetic_size]
     321            4 :                 .into_iter()
     322            4 :                 .flatten(),
     323            4 :         );
     324            4 :     }
     325              : }
     326              : 
     327              : /// Internal type to make timeline metric production testable.
     328              : ///
     329              : /// As this value type contains all of the information needed from a timeline to produce the
     330              : /// metrics, it can easily be created with different values in test.
     331              : struct TimelineSnapshot {
     332              :     loaded_at: (Lsn, SystemTime),
     333              :     last_record_lsn: Lsn,
     334              :     current_exact_logical_size: Option<u64>,
     335              : }
     336              : 
     337              : impl TimelineSnapshot {
     338              :     /// Collect the metrics from an actual timeline.
     339              :     ///
     340              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     341              :     ///
     342              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     343            0 :     fn collect(
     344            0 :         t: &Arc<crate::tenant::Timeline>,
     345            0 :         ctx: &RequestContext,
     346            0 :     ) -> anyhow::Result<Option<Self>> {
     347            0 :         if !t.is_active() {
     348              :             // no collection for broken or stopping needed, we will still keep the cached values
     349              :             // though at the caller.
     350            0 :             Ok(None)
     351              :         } else {
     352            0 :             let loaded_at = t.loaded_at;
     353            0 :             let last_record_lsn = t.get_last_record_lsn();
     354              : 
     355            0 :             let current_exact_logical_size = {
     356            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     357            0 :                 let size = span.in_scope(|| {
     358            0 :                     t.get_current_logical_size(
     359            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     360            0 :                         ctx,
     361            0 :                     )
     362            0 :                 });
     363            0 :                 match size {
     364              :                     // Only send timeline logical size when it is fully calculated.
     365            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     366            0 :                     CurrentLogicalSize::Approximate(_) => None,
     367              :                 }
     368              :             };
     369              : 
     370            0 :             Ok(Some(TimelineSnapshot {
     371            0 :                 loaded_at,
     372            0 :                 last_record_lsn,
     373            0 :                 current_exact_logical_size,
     374            0 :             }))
     375              :         }
     376            0 :     }
     377              : 
     378              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     379           12 :     fn to_metrics(
     380           12 :         &self,
     381           12 :         tenant_id: TenantId,
     382           12 :         timeline_id: TimelineId,
     383           12 :         now: DateTime<Utc>,
     384           12 :         metrics: &mut Vec<RawMetric>,
     385           12 :         cache: &Cache,
     386           12 :     ) {
     387           12 :         let timeline_written_size = u64::from(self.last_record_lsn);
     388           12 : 
     389           12 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     390           12 : 
     391           12 :         let last_stop_time = cache
     392           12 :             .get(written_size_delta_key.key())
     393           12 :             .map(|(until, _val)| {
     394            6 :                 until
     395            6 :                     .incremental_timerange()
     396            6 :                     .expect("never create EventType::Absolute for written_size_delta")
     397            6 :                     .end
     398           12 :             });
     399           12 : 
     400           12 :         let (key, written_size_now) =
     401           12 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     402           12 : 
     403           12 :         // by default, use the last sent written_size as the basis for
     404           12 :         // calculating the delta. if we don't yet have one, use the load time value.
     405           12 :         let prev = cache
     406           12 :             .get(&key)
     407           12 :             .map(|(prev_at, prev)| {
     408            8 :                 // use the prev time from our last incremental update, or default to latest
     409            8 :                 // absolute update on the first round.
     410            8 :                 let prev_at = prev_at
     411            8 :                     .absolute_time()
     412            8 :                     .expect("never create EventType::Incremental for written_size");
     413            8 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     414            8 :                 (*prev_at, *prev)
     415           12 :             })
     416           12 :             .unwrap_or_else(|| {
     417            4 :                 // if we don't have a previous point of comparison, compare to the load time
     418            4 :                 // lsn.
     419            4 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     420            4 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     421           12 :             });
     422           12 : 
     423           12 :         let up_to = now;
     424              : 
     425           12 :         if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     426            8 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     427            8 :             // written_size_delta
     428            8 :             metrics.push(key_value);
     429            8 :             // written_size
     430            8 :             metrics.push((key, written_size_now));
     431            8 :         } else {
     432            4 :             // the cached value was ahead of us, report zero until we've caught up
     433            4 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     434            4 :             // the cached value was ahead of us, report the same until we've caught up
     435            4 :             metrics.push((key, (written_size_now.0, prev.1)));
     436            4 :         }
     437              : 
     438              :         {
     439           12 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     440           12 :             let current_or_previous = self
     441           12 :                 .current_exact_logical_size
     442           12 :                 .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
     443              : 
     444           12 :             if let Some(size) = current_or_previous {
     445            8 :                 metrics.push(factory.at(now, size));
     446            8 :             }
     447              :         }
     448           12 :     }
     449              : }
     450              : 
     451              : #[cfg(test)]
     452              : mod tests;
     453              : 
     454              : #[cfg(test)]
     455              : pub(crate) use tests::metric_examples;
        

Generated by: LCOV version 2.1-beta