LCOV - differential code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 96.0 % 275 264 11 264
Current Date: 2024-01-09 02:06:09 Functions: 59.3 % 81 48 33 48
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
       2                 : use chrono::{DateTime, Utc};
       3                 : use consumption_metrics::EventType;
       4                 : use futures::stream::StreamExt;
       5                 : use std::{sync::Arc, time::SystemTime};
       6                 : use utils::{
       7                 :     id::{TenantId, TimelineId},
       8                 :     lsn::Lsn,
       9                 : };
      10                 : 
      11                 : use super::{Cache, RawMetric};
      12                 : 
      13                 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      14                 : /// instead of static str.
      15                 : // Do not rename any of these without first consulting with data team and partner
      16                 : // management.
      17 CBC         278 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      18                 : pub(super) enum Name {
      19                 :     /// Timeline last_record_lsn, absolute
      20                 :     #[serde(rename = "written_size")]
      21                 :     WrittenSize,
      22                 :     /// Timeline last_record_lsn, incremental
      23                 :     #[serde(rename = "written_data_bytes_delta")]
      24                 :     WrittenSizeDelta,
      25                 :     /// Timeline logical size
      26                 :     #[serde(rename = "timeline_logical_size")]
      27                 :     LogicalSize,
      28                 :     /// Tenant remote size
      29                 :     #[serde(rename = "remote_storage_size")]
      30                 :     RemoteSize,
      31                 :     /// Tenant resident size
      32                 :     #[serde(rename = "resident_size")]
      33                 :     ResidentSize,
      34                 :     /// Tenant synthetic size
      35                 :     #[serde(rename = "synthetic_storage_size")]
      36                 :     SyntheticSize,
      37                 : }
      38                 : 
      39                 : /// Key that uniquely identifies the object this metric describes.
      40                 : ///
      41                 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      42                 : /// elsewhere.
      43             189 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      44                 : pub(crate) struct MetricsKey {
      45                 :     pub(super) tenant_id: TenantId,
      46                 : 
      47                 :     #[serde(skip_serializing_if = "Option::is_none")]
      48                 :     pub(super) timeline_id: Option<TimelineId>,
      49                 : 
      50                 :     pub(super) metric: Name,
      51                 : }
      52                 : 
      53                 : impl MetricsKey {
      54             162 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      55             162 :         AbsoluteValueFactory(self)
      56             162 :     }
      57              32 :     const fn incremental_values(self) -> IncrementalValueFactory {
      58              32 :         IncrementalValueFactory(self)
      59              32 :     }
      60                 : }
      61                 : 
      62                 : /// Helper type which each individual metric kind can return to produce only absolute values.
      63                 : struct AbsoluteValueFactory(MetricsKey);
      64                 : 
      65                 : impl AbsoluteValueFactory {
      66             145 :     const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
      67             145 :         let key = self.0;
      68             145 :         (key, (EventType::Absolute { time }, val))
      69             145 :     }
      70                 : 
      71              21 :     fn key(&self) -> &MetricsKey {
      72              21 :         &self.0
      73              21 :     }
      74                 : }
      75                 : 
      76                 : /// Helper type which each individual metric kind can return to produce only incremental values.
      77                 : struct IncrementalValueFactory(MetricsKey);
      78                 : 
      79                 : impl IncrementalValueFactory {
      80                 :     #[allow(clippy::wrong_self_convention)]
      81              32 :     const fn from_until(
      82              32 :         self,
      83              32 :         prev_end: DateTime<Utc>,
      84              32 :         up_to: DateTime<Utc>,
      85              32 :         val: u64,
      86              32 :     ) -> RawMetric {
      87              32 :         let key = self.0;
      88              32 :         // cannot assert prev_end < up_to because these are realtime clock based
      89              32 :         let when = EventType::Incremental {
      90              32 :             start_time: prev_end,
      91              32 :             stop_time: up_to,
      92              32 :         };
      93              32 :         (key, (when, val))
      94              32 :     }
      95                 : 
      96              23 :     fn key(&self) -> &MetricsKey {
      97              23 :         &self.0
      98              23 :     }
      99                 : }
     100                 : 
     101                 : // the static part of a MetricsKey
     102                 : impl MetricsKey {
     103                 :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     104                 :     ///
     105                 :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     106              33 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     107              33 :         MetricsKey {
     108              33 :             tenant_id,
     109              33 :             timeline_id: Some(timeline_id),
     110              33 :             metric: Name::WrittenSize,
     111              33 :         }
     112              33 :         .absolute_values()
     113              33 :     }
     114                 : 
     115                 :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     116                 :     /// previously sent, starting from the previously sent incremental time range ending at the
     117                 :     /// latest absolute measurement.
     118              32 :     const fn written_size_delta(
     119              32 :         tenant_id: TenantId,
     120              32 :         timeline_id: TimelineId,
     121              32 :     ) -> IncrementalValueFactory {
     122              32 :         MetricsKey {
     123              32 :             tenant_id,
     124              32 :             timeline_id: Some(timeline_id),
     125              32 :             metric: Name::WrittenSizeDelta,
     126              32 :         }
     127              32 :         .incremental_values()
     128              32 :     }
     129                 : 
     130                 :     /// Exact [`Timeline::get_current_logical_size`].
     131                 :     ///
     132                 :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     133              30 :     const fn timeline_logical_size(
     134              30 :         tenant_id: TenantId,
     135              30 :         timeline_id: TimelineId,
     136              30 :     ) -> AbsoluteValueFactory {
     137              30 :         MetricsKey {
     138              30 :             tenant_id,
     139              30 :             timeline_id: Some(timeline_id),
     140              30 :             metric: Name::LogicalSize,
     141              30 :         }
     142              30 :         .absolute_values()
     143              30 :     }
     144                 : 
     145                 :     /// [`Tenant::remote_size`]
     146                 :     ///
     147                 :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     148              33 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     149              33 :         MetricsKey {
     150              33 :             tenant_id,
     151              33 :             timeline_id: None,
     152              33 :             metric: Name::RemoteSize,
     153              33 :         }
     154              33 :         .absolute_values()
     155              33 :     }
     156                 : 
     157                 :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     158                 :     ///
     159                 :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     160              33 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     161              33 :         MetricsKey {
     162              33 :             tenant_id,
     163              33 :             timeline_id: None,
     164              33 :             metric: Name::ResidentSize,
     165              33 :         }
     166              33 :         .absolute_values()
     167              33 :     }
     168                 : 
     169                 :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     170                 :     ///
     171                 :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     172                 :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     173              33 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     174              33 :         MetricsKey {
     175              33 :             tenant_id,
     176              33 :             timeline_id: None,
     177              33 :             metric: Name::SyntheticSize,
     178              33 :         }
     179              33 :         .absolute_values()
     180              33 :     }
     181                 : }
     182                 : 
     183              30 : pub(super) async fn collect_all_metrics(
     184              30 :     cached_metrics: &Cache,
     185              30 :     ctx: &RequestContext,
     186              30 : ) -> Vec<RawMetric> {
     187              30 :     use pageserver_api::models::TenantState;
     188              30 : 
     189              30 :     let started_at = std::time::Instant::now();
     190                 : 
     191              30 :     let tenants = match crate::tenant::mgr::list_tenants().await {
     192              30 :         Ok(tenants) => tenants,
     193 UBC           0 :         Err(err) => {
     194               0 :             tracing::error!("failed to list tenants: {:?}", err);
     195               0 :             return vec![];
     196                 :         }
     197                 :     };
     198                 : 
     199 CBC          30 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
     200              27 :         if state != TenantState::Active || !id.is_zero() {
     201 UBC           0 :             None
     202                 :         } else {
     203 CBC          27 :             crate::tenant::mgr::get_tenant(id, true)
     204              27 :                 .ok()
     205              27 :                 .map(|tenant| (id.tenant_id, tenant))
     206                 :         }
     207              30 :     });
     208                 : 
     209              30 :     let res = collect(tenants, cached_metrics, ctx).await;
     210                 : 
     211              30 :     tracing::info!(
     212              30 :         elapsed_ms = started_at.elapsed().as_millis(),
     213              30 :         total = res.len(),
     214              30 :         "collected metrics"
     215              30 :     );
     216                 : 
     217              30 :     res
     218              30 : }
     219                 : 
     220              30 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
     221              30 : where
     222              30 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     223              30 : {
     224              30 :     let mut current_metrics: Vec<RawMetric> = Vec::new();
     225              30 : 
     226              30 :     let mut tenants = std::pin::pin!(tenants);
     227                 : 
     228              57 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     229              27 :         let mut tenant_resident_size = 0;
     230                 : 
     231              27 :         for timeline in tenant.list_timelines() {
     232              18 :             let timeline_id = timeline.timeline_id;
     233              18 : 
     234              18 :             match TimelineSnapshot::collect(&timeline, ctx) {
     235              17 :                 Ok(Some(snap)) => {
     236              17 :                     snap.to_metrics(
     237              17 :                         tenant_id,
     238              17 :                         timeline_id,
     239              17 :                         Utc::now(),
     240              17 :                         &mut current_metrics,
     241              17 :                         cache,
     242              17 :                     );
     243              17 :                 }
     244               1 :                 Ok(None) => {}
     245 UBC           0 :                 Err(e) => {
     246               0 :                     tracing::error!(
     247               0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     248               0 :                         timeline.timeline_id
     249               0 :                     );
     250               0 :                     continue;
     251                 :                 }
     252                 :             }
     253                 : 
     254 CBC          18 :             tenant_resident_size += timeline.resident_physical_size();
     255                 :         }
     256                 : 
     257              27 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     258              27 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     259                 :     }
     260                 : 
     261              30 :     current_metrics
     262              30 : }
     263                 : 
     264                 : /// In-between abstraction to allow testing metrics without actual Tenants.
     265                 : struct TenantSnapshot {
     266                 :     resident_size: u64,
     267                 :     remote_size: u64,
     268                 :     synthetic_size: u64,
     269                 : }
     270                 : 
     271                 : impl TenantSnapshot {
     272                 :     /// Collect tenant status to have metrics created out of it.
     273                 :     ///
     274                 :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     275                 :     /// cannot just list timelines here.
     276              27 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     277              27 :         TenantSnapshot {
     278              27 :             resident_size,
     279              27 :             remote_size: t.remote_size(),
     280              27 :             // Note that this metric is calculated in a separate bgworker
     281              27 :             // Here we only use cached value, which may lag behind the real latest one
     282              27 :             synthetic_size: t.cached_synthetic_size(),
     283              27 :         }
     284              27 :     }
     285                 : 
     286              29 :     fn to_metrics(
     287              29 :         &self,
     288              29 :         tenant_id: TenantId,
     289              29 :         now: DateTime<Utc>,
     290              29 :         cached: &Cache,
     291              29 :         metrics: &mut Vec<RawMetric>,
     292              29 :     ) {
     293              29 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     294              29 : 
     295              29 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     296                 : 
     297              29 :         let synthetic_size = {
     298              29 :             let factory = MetricsKey::synthetic_size(tenant_id);
     299              29 :             let mut synthetic_size = self.synthetic_size;
     300              29 : 
     301              29 :             if synthetic_size == 0 {
     302              18 :                 if let Some((_, value)) = cached.get(factory.key()) {
     303               3 :                     // use the latest value from previous session
     304               3 :                     synthetic_size = *value;
     305              15 :                 }
     306              11 :             }
     307                 : 
     308              29 :             if synthetic_size != 0 {
     309                 :                 // only send non-zeroes because otherwise these show up as errors in logs
     310              14 :                 Some(factory.at(now, synthetic_size))
     311                 :             } else {
     312              15 :                 None
     313                 :             }
     314                 :         };
     315                 : 
     316              29 :         metrics.extend(
     317              29 :             [Some(remote_size), Some(resident_size), synthetic_size]
     318              29 :                 .into_iter()
     319              29 :                 .flatten(),
     320              29 :         );
     321              29 :     }
     322                 : }
     323                 : 
     324                 : /// Internal type to make timeline metric production testable.
     325                 : ///
     326                 : /// As this value type contains all of the information needed from a timeline to produce the
     327                 : /// metrics, it can easily be created with different values in test.
     328                 : struct TimelineSnapshot {
     329                 :     loaded_at: (Lsn, SystemTime),
     330                 :     last_record_lsn: Lsn,
     331                 :     current_exact_logical_size: Option<u64>,
     332                 : }
     333                 : 
     334                 : impl TimelineSnapshot {
     335                 :     /// Collect the metrics from an actual timeline.
     336                 :     ///
     337                 :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     338                 :     ///
     339                 :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     340              18 :     fn collect(
     341              18 :         t: &Arc<crate::tenant::Timeline>,
     342              18 :         ctx: &RequestContext,
     343              18 :     ) -> anyhow::Result<Option<Self>> {
     344              18 :         if !t.is_active() {
     345                 :             // no collection for broken or stopping needed, we will still keep the cached values
     346                 :             // though at the caller.
     347               1 :             Ok(None)
     348                 :         } else {
     349              17 :             let loaded_at = t.loaded_at;
     350              17 :             let last_record_lsn = t.get_last_record_lsn();
     351                 : 
     352              17 :             let current_exact_logical_size = {
     353              17 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     354              17 :                 let size = span.in_scope(|| {
     355              17 :                     t.get_current_logical_size(
     356              17 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     357              17 :                         ctx,
     358              17 :                     )
     359              17 :                 });
     360              17 :                 match size {
     361                 :                     // Only send timeline logical size when it is fully calculated.
     362              17 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     363 UBC           0 :                     CurrentLogicalSize::Approximate(_) => None,
     364                 :                 }
     365                 :             };
     366                 : 
     367 CBC          17 :             Ok(Some(TimelineSnapshot {
     368              17 :                 loaded_at,
     369              17 :                 last_record_lsn,
     370              17 :                 current_exact_logical_size,
     371              17 :             }))
     372                 :         }
     373              18 :     }
     374                 : 
     375                 :     /// Produce the timeline consumption metrics into the `metrics` argument.
     376              23 :     fn to_metrics(
     377              23 :         &self,
     378              23 :         tenant_id: TenantId,
     379              23 :         timeline_id: TimelineId,
     380              23 :         now: DateTime<Utc>,
     381              23 :         metrics: &mut Vec<RawMetric>,
     382              23 :         cache: &Cache,
     383              23 :     ) {
     384              23 :         let timeline_written_size = u64::from(self.last_record_lsn);
     385              23 : 
     386              23 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     387              23 : 
     388              23 :         let last_stop_time = cache
     389              23 :             .get(written_size_delta_key.key())
     390              23 :             .map(|(until, _val)| {
     391              15 :                 until
     392              15 :                     .incremental_timerange()
     393              15 :                     .expect("never create EventType::Absolute for written_size_delta")
     394              15 :                     .end
     395              23 :             });
     396              23 : 
     397              23 :         let (key, written_size_now) =
     398              23 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     399              23 : 
     400              23 :         // by default, use the last sent written_size as the basis for
     401              23 :         // calculating the delta. if we don't yet have one, use the load time value.
     402              23 :         let prev = cache
     403              23 :             .get(&key)
     404              23 :             .map(|(prev_at, prev)| {
     405              16 :                 // use the prev time from our last incremental update, or default to latest
     406              16 :                 // absolute update on the first round.
     407              16 :                 let prev_at = prev_at
     408              16 :                     .absolute_time()
     409              16 :                     .expect("never create EventType::Incremental for written_size");
     410              16 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     411              16 :                 (*prev_at, *prev)
     412              23 :             })
     413              23 :             .unwrap_or_else(|| {
     414               7 :                 // if we don't have a previous point of comparison, compare to the load time
     415               7 :                 // lsn.
     416               7 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     417               7 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     418              23 :             });
     419              23 : 
     420              23 :         let up_to = now;
     421                 : 
     422              23 :         if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     423              21 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     424              21 :             // written_size_delta
     425              21 :             metrics.push(key_value);
     426              21 :             // written_size
     427              21 :             metrics.push((key, written_size_now));
     428              21 :         } else {
     429               2 :             // the cached value was ahead of us, report zero until we've caught up
     430               2 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     431               2 :             // the cached value was ahead of us, report the same until we've caught up
     432               2 :             metrics.push((key, (written_size_now.0, prev.1)));
     433               2 :         }
     434                 : 
     435                 :         {
     436              23 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     437              23 :             let current_or_previous = self
     438              23 :                 .current_exact_logical_size
     439              23 :                 .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
     440                 : 
     441              23 :             if let Some(size) = current_or_previous {
     442              21 :                 metrics.push(factory.at(now, size));
     443              21 :             }
     444                 :         }
     445              23 :     }
     446                 : }
     447                 : 
     448                 : #[cfg(test)]
     449                 : mod tests;
     450                 : 
     451                 : #[cfg(test)]
     452                 : pub(crate) use tests::metric_examples;
        

Generated by: LCOV version 2.1-beta