LCOV - differential code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.9 % 273 259 14 259
Current Date: 2023-10-19 02:04:12 Functions: 54.8 % 93 51 42 51
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use crate::context::RequestContext;
       2                 : use anyhow::Context;
       3                 : use chrono::{DateTime, Utc};
       4                 : use consumption_metrics::EventType;
       5                 : use futures::stream::StreamExt;
       6                 : use serde_with::serde_as;
       7                 : use std::{sync::Arc, time::SystemTime};
       8                 : use utils::{
       9                 :     id::{TenantId, TimelineId},
      10                 :     lsn::Lsn,
      11                 : };
      12                 : 
      13                 : use super::{Cache, RawMetric};
      14                 : 
      15                 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      16                 : /// instead of static str.
      17                 : // Do not rename any of these without first consulting with data team and partner
      18                 : // management.
      19 CBC         188 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      20                 : pub(super) enum Name {
      21                 :     /// Timeline last_record_lsn, absolute
      22                 :     #[serde(rename = "written_size")]
      23                 :     WrittenSize,
      24                 :     /// Timeline last_record_lsn, incremental
      25                 :     #[serde(rename = "written_data_bytes_delta")]
      26                 :     WrittenSizeDelta,
      27                 :     /// Timeline logical size
      28                 :     #[serde(rename = "timeline_logical_size")]
      29                 :     LogicalSize,
      30                 :     /// Tenant remote size
      31                 :     #[serde(rename = "remote_storage_size")]
      32                 :     RemoteSize,
      33                 :     /// Tenant resident size
      34                 :     #[serde(rename = "resident_size")]
      35                 :     ResidentSize,
      36                 :     /// Tenant synthetic size
      37                 :     #[serde(rename = "synthetic_storage_size")]
      38                 :     SyntheticSize,
      39                 : }
      40                 : 
      41                 : /// Key that uniquely identifies the object this metric describes.
      42                 : ///
      43                 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      44                 : /// elsewhere.
      45                 : #[serde_with::serde_as]
      46             134 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      47                 : pub(crate) struct MetricsKey {
      48                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
      49                 :     pub(super) tenant_id: TenantId,
      50                 : 
      51                 :     #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
      52                 :     #[serde(skip_serializing_if = "Option::is_none")]
      53                 :     pub(super) timeline_id: Option<TimelineId>,
      54                 : 
      55                 :     pub(super) metric: Name,
      56                 : }
      57                 : 
      58                 : impl MetricsKey {
      59             116 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      60             116 :         AbsoluteValueFactory(self)
      61             116 :     }
      62              27 :     const fn incremental_values(self) -> IncrementalValueFactory {
      63              27 :         IncrementalValueFactory(self)
      64              27 :     }
      65                 : }
      66                 : 
      67                 : /// Helper type which each individual metric kind can return to produce only absolute values.
      68                 : struct AbsoluteValueFactory(MetricsKey);
      69                 : 
      70                 : impl AbsoluteValueFactory {
      71             105 :     const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
      72             105 :         let key = self.0;
      73             105 :         (key, (EventType::Absolute { time }, val))
      74             105 :     }
      75                 : 
      76              15 :     fn key(&self) -> &MetricsKey {
      77              15 :         &self.0
      78              15 :     }
      79                 : }
      80                 : 
      81                 : /// Helper type which each individual metric kind can return to produce only incremental values.
      82                 : struct IncrementalValueFactory(MetricsKey);
      83                 : 
      84                 : impl IncrementalValueFactory {
      85                 :     #[allow(clippy::wrong_self_convention)]
      86              27 :     const fn from_until(
      87              27 :         self,
      88              27 :         prev_end: DateTime<Utc>,
      89              27 :         up_to: DateTime<Utc>,
      90              27 :         val: u64,
      91              27 :     ) -> RawMetric {
      92              27 :         let key = self.0;
      93              27 :         // cannot assert prev_end < up_to because these are realtime clock based
      94              27 :         let when = EventType::Incremental {
      95              27 :             start_time: prev_end,
      96              27 :             stop_time: up_to,
      97              27 :         };
      98              27 :         (key, (when, val))
      99              27 :     }
     100                 : 
     101              18 :     fn key(&self) -> &MetricsKey {
     102              18 :         &self.0
     103              18 :     }
     104                 : }
     105                 : 
     106                 : // the static part of a MetricsKey
     107                 : impl MetricsKey {
     108                 :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     109                 :     ///
     110                 :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     111              28 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     112              28 :         MetricsKey {
     113              28 :             tenant_id,
     114              28 :             timeline_id: Some(timeline_id),
     115              28 :             metric: Name::WrittenSize,
     116              28 :         }
     117              28 :         .absolute_values()
     118              28 :     }
     119                 : 
     120                 :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     121                 :     /// previously sent, starting from the previously sent incremental time range ending at the
     122                 :     /// latest absolute measurement.
     123              27 :     const fn written_size_delta(
     124              27 :         tenant_id: TenantId,
     125              27 :         timeline_id: TimelineId,
     126              27 :     ) -> IncrementalValueFactory {
     127              27 :         MetricsKey {
     128              27 :             tenant_id,
     129              27 :             timeline_id: Some(timeline_id),
     130              27 :             metric: Name::WrittenSizeDelta,
     131              27 :         }
     132              27 :         .incremental_values()
     133              27 :     }
     134                 : 
     135                 :     /// Exact [`Timeline::get_current_logical_size`].
     136                 :     ///
     137                 :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     138              25 :     const fn timeline_logical_size(
     139              25 :         tenant_id: TenantId,
     140              25 :         timeline_id: TimelineId,
     141              25 :     ) -> AbsoluteValueFactory {
     142              25 :         MetricsKey {
     143              25 :             tenant_id,
     144              25 :             timeline_id: Some(timeline_id),
     145              25 :             metric: Name::LogicalSize,
     146              25 :         }
     147              25 :         .absolute_values()
     148              25 :     }
     149                 : 
     150                 :     /// [`Tenant::remote_size`]
     151                 :     ///
     152                 :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     153              21 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     154              21 :         MetricsKey {
     155              21 :             tenant_id,
     156              21 :             timeline_id: None,
     157              21 :             metric: Name::RemoteSize,
     158              21 :         }
     159              21 :         .absolute_values()
     160              21 :     }
     161                 : 
     162                 :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     163                 :     ///
     164                 :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     165              21 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     166              21 :         MetricsKey {
     167              21 :             tenant_id,
     168              21 :             timeline_id: None,
     169              21 :             metric: Name::ResidentSize,
     170              21 :         }
     171              21 :         .absolute_values()
     172              21 :     }
     173                 : 
     174                 :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     175                 :     ///
     176                 :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     177                 :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     178              21 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     179              21 :         MetricsKey {
     180              21 :             tenant_id,
     181              21 :             timeline_id: None,
     182              21 :             metric: Name::SyntheticSize,
     183              21 :         }
     184              21 :         .absolute_values()
     185              21 :     }
     186                 : }
     187                 : 
     188              18 : pub(super) async fn collect_all_metrics(
     189              18 :     cached_metrics: &Cache,
     190              18 :     ctx: &RequestContext,
     191              18 : ) -> Vec<RawMetric> {
     192              18 :     use pageserver_api::models::TenantState;
     193              18 : 
     194              18 :     let started_at = std::time::Instant::now();
     195                 : 
     196              18 :     let tenants = match crate::tenant::mgr::list_tenants().await {
     197              18 :         Ok(tenants) => tenants,
     198 UBC           0 :         Err(err) => {
     199               0 :             tracing::error!("failed to list tenants: {:?}", err);
     200               0 :             return vec![];
     201                 :         }
     202                 :     };
     203                 : 
     204 CBC          18 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
     205              15 :         if state != TenantState::Active {
     206 UBC           0 :             None
     207                 :         } else {
     208 CBC          15 :             crate::tenant::mgr::get_tenant(id, true)
     209 UBC           0 :                 .await
     210 CBC          15 :                 .ok()
     211              15 :                 .map(|tenant| (id, tenant))
     212                 :         }
     213              18 :     });
     214                 : 
     215              18 :     let res = collect(tenants, cached_metrics, ctx).await;
     216                 : 
     217              18 :     tracing::info!(
     218              18 :         elapsed_ms = started_at.elapsed().as_millis(),
     219              18 :         total = res.len(),
     220              18 :         "collected metrics"
     221              18 :     );
     222                 : 
     223              18 :     res
     224              18 : }
     225                 : 
     226              18 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
     227              18 : where
     228              18 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     229              18 : {
     230              18 :     let mut current_metrics: Vec<RawMetric> = Vec::new();
     231              18 : 
     232              18 :     let mut tenants = std::pin::pin!(tenants);
     233                 : 
     234              33 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     235              15 :         let mut tenant_resident_size = 0;
     236                 : 
     237              15 :         for timeline in tenant.list_timelines() {
     238              12 :             let timeline_id = timeline.timeline_id;
     239              12 : 
     240              12 :             match TimelineSnapshot::collect(&timeline, ctx) {
     241              12 :                 Ok(Some(snap)) => {
     242              12 :                     snap.to_metrics(
     243              12 :                         tenant_id,
     244              12 :                         timeline_id,
     245              12 :                         Utc::now(),
     246              12 :                         &mut current_metrics,
     247              12 :                         cache,
     248              12 :                     );
     249              12 :                 }
     250 UBC           0 :                 Ok(None) => {}
     251               0 :                 Err(e) => {
     252               0 :                     tracing::error!(
     253               0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     254               0 :                         timeline.timeline_id
     255               0 :                     );
     256               0 :                     continue;
     257                 :                 }
     258                 :             }
     259                 : 
     260 CBC          12 :             tenant_resident_size += timeline.resident_physical_size();
     261                 :         }
     262                 : 
     263              15 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     264              15 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     265                 :     }
     266                 : 
     267              18 :     current_metrics
     268              18 : }
     269                 : 
     270                 : /// In-between abstraction to allow testing metrics without actual Tenants.
     271                 : struct TenantSnapshot {
     272                 :     resident_size: u64,
     273                 :     remote_size: u64,
     274                 :     synthetic_size: u64,
     275                 : }
     276                 : 
     277                 : impl TenantSnapshot {
     278                 :     /// Collect tenant status to have metrics created out of it.
     279                 :     ///
     280                 :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     281                 :     /// cannot just list timelines here.
     282              15 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     283              15 :         TenantSnapshot {
     284              15 :             resident_size,
     285              15 :             remote_size: t.remote_size(),
     286              15 :             // Note that this metric is calculated in a separate bgworker
     287              15 :             // Here we only use cached value, which may lag behind the real latest one
     288              15 :             synthetic_size: t.cached_synthetic_size(),
     289              15 :         }
     290              15 :     }
     291                 : 
     292              17 :     fn to_metrics(
     293              17 :         &self,
     294              17 :         tenant_id: TenantId,
     295              17 :         now: DateTime<Utc>,
     296              17 :         cached: &Cache,
     297              17 :         metrics: &mut Vec<RawMetric>,
     298              17 :     ) {
     299              17 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     300              17 : 
     301              17 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     302                 : 
     303              17 :         let synthetic_size = {
     304              17 :             let factory = MetricsKey::synthetic_size(tenant_id);
     305              17 :             let mut synthetic_size = self.synthetic_size;
     306              17 : 
     307              17 :             if synthetic_size == 0 {
     308              12 :                 if let Some((_, value)) = cached.get(factory.key()) {
     309               3 :                     // use the latest value from previous session
     310               3 :                     synthetic_size = *value;
     311               9 :                 }
     312               5 :             }
     313                 : 
     314              17 :             if synthetic_size != 0 {
     315                 :                 // only send non-zeroes because otherwise these show up as errors in logs
     316               8 :                 Some(factory.at(now, synthetic_size))
     317                 :             } else {
     318               9 :                 None
     319                 :             }
     320                 :         };
     321                 : 
     322              17 :         metrics.extend(
     323              17 :             [Some(remote_size), Some(resident_size), synthetic_size]
     324              17 :                 .into_iter()
     325              17 :                 .flatten(),
     326              17 :         );
     327              17 :     }
     328                 : }
     329                 : 
     330                 : /// Internal type to make timeline metric production testable.
     331                 : ///
     332                 : /// As this value type contains all of the information needed from a timeline to produce the
     333                 : /// metrics, it can easily be created with different values in test.
     334                 : struct TimelineSnapshot {
     335                 :     loaded_at: (Lsn, SystemTime),
     336                 :     last_record_lsn: Lsn,
     337                 :     current_exact_logical_size: Option<u64>,
     338                 : }
     339                 : 
     340                 : impl TimelineSnapshot {
     341                 :     /// Collect the metrics from an actual timeline.
     342                 :     ///
     343                 :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     344                 :     ///
     345                 :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     346              12 :     fn collect(
     347              12 :         t: &Arc<crate::tenant::Timeline>,
     348              12 :         ctx: &RequestContext,
     349              12 :     ) -> anyhow::Result<Option<Self>> {
     350              12 :         if !t.is_active() {
     351                 :             // no collection for broken or stopping needed, we will still keep the cached values
     352                 :             // though at the caller.
     353 UBC           0 :             Ok(None)
     354                 :         } else {
     355 CBC          12 :             let loaded_at = t.loaded_at;
     356              12 :             let last_record_lsn = t.get_last_record_lsn();
     357                 : 
     358              12 :             let current_exact_logical_size = {
     359              12 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
     360              12 :                 let res = span
     361              12 :                     .in_scope(|| t.get_current_logical_size(ctx))
     362              12 :                     .context("get_current_logical_size");
     363              12 :                 match res? {
     364                 :                     // Only send timeline logical size when it is fully calculated.
     365              12 :                     (size, is_exact) if is_exact => Some(size),
     366 UBC           0 :                     (_, _) => None,
     367                 :                 }
     368                 :             };
     369                 : 
     370 CBC          12 :             Ok(Some(TimelineSnapshot {
     371              12 :                 loaded_at,
     372              12 :                 last_record_lsn,
     373              12 :                 current_exact_logical_size,
     374              12 :             }))
     375                 :         }
     376              12 :     }
     377                 : 
     378                 :     /// Produce the timeline consumption metrics into the `metrics` argument.
     379              18 :     fn to_metrics(
     380              18 :         &self,
     381              18 :         tenant_id: TenantId,
     382              18 :         timeline_id: TimelineId,
     383              18 :         now: DateTime<Utc>,
     384              18 :         metrics: &mut Vec<RawMetric>,
     385              18 :         cache: &Cache,
     386              18 :     ) {
     387              18 :         let timeline_written_size = u64::from(self.last_record_lsn);
     388              18 : 
     389              18 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     390              18 : 
     391              18 :         let last_stop_time = cache
     392              18 :             .get(written_size_delta_key.key())
     393              18 :             .map(|(until, _val)| {
     394              11 :                 until
     395              11 :                     .incremental_timerange()
     396              11 :                     .expect("never create EventType::Absolute for written_size_delta")
     397              11 :                     .end
     398              18 :             });
     399              18 : 
     400              18 :         let (key, written_size_now) =
     401              18 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     402              18 : 
     403              18 :         // by default, use the last sent written_size as the basis for
     404              18 :         // calculating the delta. if we don't yet have one, use the load time value.
     405              18 :         let prev = cache
     406              18 :             .get(&key)
     407              18 :             .map(|(prev_at, prev)| {
     408              12 :                 // use the prev time from our last incremental update, or default to latest
     409              12 :                 // absolute update on the first round.
     410              12 :                 let prev_at = prev_at
     411              12 :                     .absolute_time()
     412              12 :                     .expect("never create EventType::Incremental for written_size");
     413              12 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     414              12 :                 (*prev_at, *prev)
     415              18 :             })
     416              18 :             .unwrap_or_else(|| {
     417               6 :                 // if we don't have a previous point of comparison, compare to the load time
     418               6 :                 // lsn.
     419               6 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     420               6 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     421              18 :             });
     422              18 : 
     423              18 :         let up_to = now;
     424                 : 
     425              18 :         if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     426              16 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     427              16 :             // written_size_delta
     428              16 :             metrics.push(key_value);
     429              16 :             // written_size
     430              16 :             metrics.push((key, written_size_now));
     431              16 :         } else {
     432               2 :             // the cached value was ahead of us, report zero until we've caught up
     433               2 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     434               2 :             // the cached value was ahead of us, report the same until we've caught up
     435               2 :             metrics.push((key, (written_size_now.0, prev.1)));
     436               2 :         }
     437                 : 
     438                 :         {
     439              18 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     440              18 :             let current_or_previous = self
     441              18 :                 .current_exact_logical_size
     442              18 :                 .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
     443                 : 
     444              18 :             if let Some(size) = current_or_previous {
     445              16 :                 metrics.push(factory.at(now, size));
     446              16 :             }
     447                 :         }
     448              18 :     }
     449                 : }
     450                 : 
     451                 : #[cfg(test)]
     452                 : mod tests;
     453                 : 
     454                 : #[cfg(test)]
     455                 : pub(crate) use tests::metric_examples;
        

Generated by: LCOV version 2.1-beta