LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 95.3 % 275 262
Test Date: 2024-02-07 07:37:29 Functions: 59.3 % 81 48

            Line data    Source code
       1              : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
       2              : use chrono::{DateTime, Utc};
       3              : use consumption_metrics::EventType;
       4              : use futures::stream::StreamExt;
       5              : use std::{sync::Arc, time::SystemTime};
       6              : use utils::{
       7              :     id::{TenantId, TimelineId},
       8              :     lsn::Lsn,
       9              : };
      10              : 
      11              : use super::{Cache, RawMetric};
      12              : 
      13              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      14              : /// instead of static str.
      15              : // Do not rename any of these without first consulting with data team and partner
      16              : // management.
      17          284 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      18              : pub(super) enum Name {
      19              :     /// Timeline last_record_lsn, absolute
      20              :     #[serde(rename = "written_size")]
      21              :     WrittenSize,
      22              :     /// Timeline last_record_lsn, incremental
      23              :     #[serde(rename = "written_data_bytes_delta")]
      24              :     WrittenSizeDelta,
      25              :     /// Timeline logical size
      26              :     #[serde(rename = "timeline_logical_size")]
      27              :     LogicalSize,
      28              :     /// Tenant remote size
      29              :     #[serde(rename = "remote_storage_size")]
      30              :     RemoteSize,
      31              :     /// Tenant resident size
      32              :     #[serde(rename = "resident_size")]
      33              :     ResidentSize,
      34              :     /// Tenant synthetic size
      35              :     #[serde(rename = "synthetic_storage_size")]
      36              :     SyntheticSize,
      37              : }
      38              : 
      39              : /// Key that uniquely identifies the object this metric describes.
      40              : ///
      41              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      42              : /// elsewhere.
      43          196 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      44              : pub(crate) struct MetricsKey {
      45              :     pub(super) tenant_id: TenantId,
      46              : 
      47              :     #[serde(skip_serializing_if = "Option::is_none")]
      48              :     pub(super) timeline_id: Option<TimelineId>,
      49              : 
      50              :     pub(super) metric: Name,
      51              : }
      52              : 
      53              : impl MetricsKey {
      54          193 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      55          193 :         AbsoluteValueFactory(self)
      56          193 :     }
      57           45 :     const fn incremental_values(self) -> IncrementalValueFactory {
      58           45 :         IncrementalValueFactory(self)
      59           45 :     }
      60              : }
      61              : 
      62              : /// Helper type which each individual metric kind can return to produce only absolute values.
      63              : struct AbsoluteValueFactory(MetricsKey);
      64              : 
      65              : impl AbsoluteValueFactory {
      66          173 :     const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
      67          173 :         let key = self.0;
      68          173 :         (key, (EventType::Absolute { time }, val))
      69          173 :     }
      70              : 
      71           26 :     fn key(&self) -> &MetricsKey {
      72           26 :         &self.0
      73           26 :     }
      74              : }
      75              : 
      76              : /// Helper type which each individual metric kind can return to produce only incremental values.
      77              : struct IncrementalValueFactory(MetricsKey);
      78              : 
      79              : impl IncrementalValueFactory {
      80              :     #[allow(clippy::wrong_self_convention)]
      81           45 :     const fn from_until(
      82           45 :         self,
      83           45 :         prev_end: DateTime<Utc>,
      84           45 :         up_to: DateTime<Utc>,
      85           45 :         val: u64,
      86           45 :     ) -> RawMetric {
      87           45 :         let key = self.0;
      88           45 :         // cannot assert prev_end < up_to because these are realtime clock based
      89           45 :         let when = EventType::Incremental {
      90           45 :             start_time: prev_end,
      91           45 :             stop_time: up_to,
      92           45 :         };
      93           45 :         (key, (when, val))
      94           45 :     }
      95              : 
      96           27 :     fn key(&self) -> &MetricsKey {
      97           27 :         &self.0
      98           27 :     }
      99              : }
     100              : 
     101              : // the static part of a MetricsKey
     102              : impl MetricsKey {
     103              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     104              :     ///
     105              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     106           47 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     107           47 :         MetricsKey {
     108           47 :             tenant_id,
     109           47 :             timeline_id: Some(timeline_id),
     110           47 :             metric: Name::WrittenSize,
     111           47 :         }
     112           47 :         .absolute_values()
     113           47 :     }
     114              : 
     115              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     116              :     /// previously sent, starting from the previously sent incremental time range ending at the
     117              :     /// latest absolute measurement.
     118           45 :     const fn written_size_delta(
     119           45 :         tenant_id: TenantId,
     120           45 :         timeline_id: TimelineId,
     121           45 :     ) -> IncrementalValueFactory {
     122           45 :         MetricsKey {
     123           45 :             tenant_id,
     124           45 :             timeline_id: Some(timeline_id),
     125           45 :             metric: Name::WrittenSizeDelta,
     126           45 :         }
     127           45 :         .incremental_values()
     128           45 :     }
     129              : 
     130              :     /// Exact [`Timeline::get_current_logical_size`].
     131              :     ///
     132              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     133           41 :     const fn timeline_logical_size(
     134           41 :         tenant_id: TenantId,
     135           41 :         timeline_id: TimelineId,
     136           41 :     ) -> AbsoluteValueFactory {
     137           41 :         MetricsKey {
     138           41 :             tenant_id,
     139           41 :             timeline_id: Some(timeline_id),
     140           41 :             metric: Name::LogicalSize,
     141           41 :         }
     142           41 :         .absolute_values()
     143           41 :     }
     144              : 
     145              :     /// [`Tenant::remote_size`]
     146              :     ///
     147              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     148           35 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     149           35 :         MetricsKey {
     150           35 :             tenant_id,
     151           35 :             timeline_id: None,
     152           35 :             metric: Name::RemoteSize,
     153           35 :         }
     154           35 :         .absolute_values()
     155           35 :     }
     156              : 
     157              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     158              :     ///
     159              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     160           35 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     161           35 :         MetricsKey {
     162           35 :             tenant_id,
     163           35 :             timeline_id: None,
     164           35 :             metric: Name::ResidentSize,
     165           35 :         }
     166           35 :         .absolute_values()
     167           35 :     }
     168              : 
     169              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     170              :     ///
     171              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     172              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     173           35 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     174           35 :         MetricsKey {
     175           35 :             tenant_id,
     176           35 :             timeline_id: None,
     177           35 :             metric: Name::SyntheticSize,
     178           35 :         }
     179           35 :         .absolute_values()
     180           35 :     }
     181              : }
     182              : 
     183           26 : pub(super) async fn collect_all_metrics(
     184           26 :     cached_metrics: &Cache,
     185           26 :     ctx: &RequestContext,
     186           26 : ) -> Vec<RawMetric> {
     187           26 :     use pageserver_api::models::TenantState;
     188           26 : 
     189           26 :     let started_at = std::time::Instant::now();
     190              : 
     191           26 :     let tenants = match crate::tenant::mgr::list_tenants().await {
     192           26 :         Ok(tenants) => tenants,
     193            0 :         Err(err) => {
     194            0 :             tracing::error!("failed to list tenants: {:?}", err);
     195            0 :             return vec![];
     196              :         }
     197              :     };
     198              : 
     199           26 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     200           23 :         if state != TenantState::Active || !id.is_zero() {
     201            0 :             None
     202              :         } else {
     203           23 :             crate::tenant::mgr::get_tenant(id, true)
     204           23 :                 .ok()
     205           23 :                 .map(|tenant| (id.tenant_id, tenant))
     206              :         }
     207           26 :     });
     208              : 
     209           26 :     let res = collect(tenants, cached_metrics, ctx).await;
     210              : 
     211           26 :     tracing::info!(
     212           26 :         elapsed_ms = started_at.elapsed().as_millis(),
     213           26 :         total = res.len(),
     214           26 :         "collected metrics"
     215           26 :     );
     216              : 
     217           26 :     res
     218           26 : }
     219              : 
     220           26 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
     221           26 : where
     222           26 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     223           26 : {
     224           26 :     let mut current_metrics: Vec<RawMetric> = Vec::new();
     225           26 : 
     226           26 :     let mut tenants = std::pin::pin!(tenants);
     227              : 
     228           49 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     229           23 :         let mut tenant_resident_size = 0;
     230              : 
     231           23 :         for timeline in tenant.list_timelines() {
     232           15 :             let timeline_id = timeline.timeline_id;
     233           15 : 
     234           15 :             match TimelineSnapshot::collect(&timeline, ctx) {
     235           15 :                 Ok(Some(snap)) => {
     236           15 :                     snap.to_metrics(
     237           15 :                         tenant_id,
     238           15 :                         timeline_id,
     239           15 :                         Utc::now(),
     240           15 :                         &mut current_metrics,
     241           15 :                         cache,
     242           15 :                     );
     243           15 :                 }
     244            0 :                 Ok(None) => {}
     245            0 :                 Err(e) => {
     246            0 :                     tracing::error!(
     247            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     248            0 :                         timeline.timeline_id
     249            0 :                     );
     250            0 :                     continue;
     251              :                 }
     252              :             }
     253              : 
     254           15 :             tenant_resident_size += timeline.resident_physical_size();
     255              :         }
     256              : 
     257           23 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     258           23 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     259              :     }
     260              : 
     261           26 :     current_metrics
     262           26 : }
     263              : 
     264              : /// In-between abstraction to allow testing metrics without actual Tenants.
     265              : struct TenantSnapshot {
     266              :     resident_size: u64,
     267              :     remote_size: u64,
     268              :     synthetic_size: u64,
     269              : }
     270              : 
     271              : impl TenantSnapshot {
     272              :     /// Collect tenant status to have metrics created out of it.
     273              :     ///
     274              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     275              :     /// cannot just list timelines here.
     276           23 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     277           23 :         TenantSnapshot {
     278           23 :             resident_size,
     279           23 :             remote_size: t.remote_size(),
     280           23 :             // Note that this metric is calculated in a separate bgworker
     281           23 :             // Here we only use cached value, which may lag behind the real latest one
     282           23 :             synthetic_size: t.cached_synthetic_size(),
     283           23 :         }
     284           23 :     }
     285              : 
     286           27 :     fn to_metrics(
     287           27 :         &self,
     288           27 :         tenant_id: TenantId,
     289           27 :         now: DateTime<Utc>,
     290           27 :         cached: &Cache,
     291           27 :         metrics: &mut Vec<RawMetric>,
     292           27 :     ) {
     293           27 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     294           27 : 
     295           27 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     296              : 
     297           27 :         let synthetic_size = {
     298           27 :             let factory = MetricsKey::synthetic_size(tenant_id);
     299           27 :             let mut synthetic_size = self.synthetic_size;
     300           27 : 
     301           27 :             if synthetic_size == 0 {
     302           20 :                 if let Some((_, value)) = cached.get(factory.key()) {
     303            4 :                     // use the latest value from previous session
     304            4 :                     synthetic_size = *value;
     305           16 :                 }
     306            7 :             }
     307              : 
     308           27 :             if synthetic_size != 0 {
     309              :                 // only send non-zeroes because otherwise these show up as errors in logs
     310           11 :                 Some(factory.at(now, synthetic_size))
     311              :             } else {
     312           16 :                 None
     313              :             }
     314              :         };
     315              : 
     316           27 :         metrics.extend(
     317           27 :             [Some(remote_size), Some(resident_size), synthetic_size]
     318           27 :                 .into_iter()
     319           27 :                 .flatten(),
     320           27 :         );
     321           27 :     }
     322              : }
     323              : 
     324              : /// Internal type to make timeline metric production testable.
     325              : ///
     326              : /// As this value type contains all of the information needed from a timeline to produce the
     327              : /// metrics, it can easily be created with different values in test.
     328              : struct TimelineSnapshot {
     329              :     loaded_at: (Lsn, SystemTime),
     330              :     last_record_lsn: Lsn,
     331              :     current_exact_logical_size: Option<u64>,
     332              : }
     333              : 
     334              : impl TimelineSnapshot {
     335              :     /// Collect the metrics from an actual timeline.
     336              :     ///
     337              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     338              :     ///
     339              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     340           15 :     fn collect(
     341           15 :         t: &Arc<crate::tenant::Timeline>,
     342           15 :         ctx: &RequestContext,
     343           15 :     ) -> anyhow::Result<Option<Self>> {
     344           15 :         if !t.is_active() {
     345              :             // no collection for broken or stopping needed, we will still keep the cached values
     346              :             // though at the caller.
     347            0 :             Ok(None)
     348              :         } else {
     349           15 :             let loaded_at = t.loaded_at;
     350           15 :             let last_record_lsn = t.get_last_record_lsn();
     351              : 
     352           15 :             let current_exact_logical_size = {
     353           15 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     354           15 :                 let size = span.in_scope(|| {
     355           15 :                     t.get_current_logical_size(
     356           15 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     357           15 :                         ctx,
     358           15 :                     )
     359           15 :                 });
     360           15 :                 match size {
     361              :                     // Only send timeline logical size when it is fully calculated.
     362           15 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     363            0 :                     CurrentLogicalSize::Approximate(_) => None,
     364              :                 }
     365              :             };
     366              : 
     367           15 :             Ok(Some(TimelineSnapshot {
     368           15 :                 loaded_at,
     369           15 :                 last_record_lsn,
     370           15 :                 current_exact_logical_size,
     371           15 :             }))
     372              :         }
     373           15 :     }
     374              : 
     375              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     376           27 :     fn to_metrics(
     377           27 :         &self,
     378           27 :         tenant_id: TenantId,
     379           27 :         timeline_id: TimelineId,
     380           27 :         now: DateTime<Utc>,
     381           27 :         metrics: &mut Vec<RawMetric>,
     382           27 :         cache: &Cache,
     383           27 :     ) {
     384           27 :         let timeline_written_size = u64::from(self.last_record_lsn);
     385           27 : 
     386           27 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     387           27 : 
     388           27 :         let last_stop_time = cache
     389           27 :             .get(written_size_delta_key.key())
     390           27 :             .map(|(until, _val)| {
     391           17 :                 until
     392           17 :                     .incremental_timerange()
     393           17 :                     .expect("never create EventType::Absolute for written_size_delta")
     394           17 :                     .end
     395           27 :             });
     396           27 : 
     397           27 :         let (key, written_size_now) =
     398           27 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     399           27 : 
     400           27 :         // by default, use the last sent written_size as the basis for
     401           27 :         // calculating the delta. if we don't yet have one, use the load time value.
     402           27 :         let prev = cache
     403           27 :             .get(&key)
     404           27 :             .map(|(prev_at, prev)| {
     405           19 :                 // use the prev time from our last incremental update, or default to latest
     406           19 :                 // absolute update on the first round.
     407           19 :                 let prev_at = prev_at
     408           19 :                     .absolute_time()
     409           19 :                     .expect("never create EventType::Incremental for written_size");
     410           19 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     411           19 :                 (*prev_at, *prev)
     412           27 :             })
     413           27 :             .unwrap_or_else(|| {
     414            8 :                 // if we don't have a previous point of comparison, compare to the load time
     415            8 :                 // lsn.
     416            8 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     417            8 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     418           27 :             });
     419           27 : 
     420           27 :         let up_to = now;
     421              : 
     422           27 :         if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     423           23 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     424           23 :             // written_size_delta
     425           23 :             metrics.push(key_value);
     426           23 :             // written_size
     427           23 :             metrics.push((key, written_size_now));
     428           23 :         } else {
     429            4 :             // the cached value was ahead of us, report zero until we've caught up
     430            4 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     431            4 :             // the cached value was ahead of us, report the same until we've caught up
     432            4 :             metrics.push((key, (written_size_now.0, prev.1)));
     433            4 :         }
     434              : 
     435              :         {
     436           27 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     437           27 :             let current_or_previous = self
     438           27 :                 .current_exact_logical_size
     439           27 :                 .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
     440              : 
     441           27 :             if let Some(size) = current_or_previous {
     442           23 :                 metrics.push(factory.at(now, size));
     443           23 :             }
     444              :         }
     445           27 :     }
     446              : }
     447              : 
     448              : #[cfg(test)]
     449              : mod tests;
     450              : 
     451              : #[cfg(test)]
     452              : pub(crate) use tests::metric_examples;
        

Generated by: LCOV version 2.1-beta