LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - metrics.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 65.1 % 275 179
Test Date: 2024-02-29 11:57:12 Functions: 35.4 % 82 29

            Line data    Source code
       1              : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
       2              : use chrono::{DateTime, Utc};
       3              : use consumption_metrics::EventType;
       4              : use futures::stream::StreamExt;
       5              : use std::{sync::Arc, time::SystemTime};
       6              : use utils::{
       7              :     id::{TenantId, TimelineId},
       8              :     lsn::Lsn,
       9              : };
      10              : 
      11              : use super::{Cache, RawMetric};
      12              : 
      13              : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
      14              : /// instead of static str.
      15              : // Do not rename any of these without first consulting with data team and partner
      16              : // management.
      17          144 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      18              : pub(super) enum Name {
      19              :     /// Timeline last_record_lsn, absolute
      20              :     #[serde(rename = "written_size")]
      21              :     WrittenSize,
      22              :     /// Timeline last_record_lsn, incremental
      23              :     #[serde(rename = "written_data_bytes_delta")]
      24              :     WrittenSizeDelta,
      25              :     /// Timeline logical size
      26              :     #[serde(rename = "timeline_logical_size")]
      27              :     LogicalSize,
      28              :     /// Tenant remote size
      29              :     #[serde(rename = "remote_storage_size")]
      30              :     RemoteSize,
      31              :     /// Tenant resident size
      32              :     #[serde(rename = "resident_size")]
      33              :     ResidentSize,
      34              :     /// Tenant synthetic size
      35              :     #[serde(rename = "synthetic_storage_size")]
      36              :     SyntheticSize,
      37              : }
      38              : 
      39              : /// Key that uniquely identifies the object this metric describes.
      40              : ///
      41              : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
      42              : /// elsewhere.
      43           60 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
      44              : pub(crate) struct MetricsKey {
      45              :     pub(super) tenant_id: TenantId,
      46              : 
      47              :     #[serde(skip_serializing_if = "Option::is_none")]
      48              :     pub(super) timeline_id: Option<TimelineId>,
      49              : 
      50              :     pub(super) metric: Name,
      51              : }
      52              : 
      53              : impl MetricsKey {
      54           94 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      55           94 :         AbsoluteValueFactory(self)
      56           94 :     }
      57           30 :     const fn incremental_values(self) -> IncrementalValueFactory {
      58           30 :         IncrementalValueFactory(self)
      59           30 :     }
      60              : }
      61              : 
      62              : /// Helper type which each individual metric kind can return to produce only absolute values.
      63              : struct AbsoluteValueFactory(MetricsKey);
      64              : 
      65              : impl AbsoluteValueFactory {
      66           88 :     const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
      67           88 :         let key = self.0;
      68           88 :         (key, (EventType::Absolute { time }, val))
      69           88 :     }
      70              : 
      71           10 :     fn key(&self) -> &MetricsKey {
      72           10 :         &self.0
      73           10 :     }
      74              : }
      75              : 
      76              : /// Helper type which each individual metric kind can return to produce only incremental values.
      77              : struct IncrementalValueFactory(MetricsKey);
      78              : 
      79              : impl IncrementalValueFactory {
      80              :     #[allow(clippy::wrong_self_convention)]
      81           30 :     const fn from_until(
      82           30 :         self,
      83           30 :         prev_end: DateTime<Utc>,
      84           30 :         up_to: DateTime<Utc>,
      85           30 :         val: u64,
      86           30 :     ) -> RawMetric {
      87           30 :         let key = self.0;
      88           30 :         // cannot assert prev_end < up_to because these are realtime clock based
      89           30 :         let when = EventType::Incremental {
      90           30 :             start_time: prev_end,
      91           30 :             stop_time: up_to,
      92           30 :         };
      93           30 :         (key, (when, val))
      94           30 :     }
      95              : 
      96           12 :     fn key(&self) -> &MetricsKey {
      97           12 :         &self.0
      98           12 :     }
      99              : }
     100              : 
     101              : // the static part of a MetricsKey
     102              : impl MetricsKey {
     103              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
     104              :     ///
     105              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
     106           32 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
     107           32 :         MetricsKey {
     108           32 :             tenant_id,
     109           32 :             timeline_id: Some(timeline_id),
     110           32 :             metric: Name::WrittenSize,
     111           32 :         }
     112           32 :         .absolute_values()
     113           32 :     }
     114              : 
     115              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     116              :     /// previously sent, starting from the previously sent incremental time range ending at the
     117              :     /// latest absolute measurement.
     118           30 :     const fn written_size_delta(
     119           30 :         tenant_id: TenantId,
     120           30 :         timeline_id: TimelineId,
     121           30 :     ) -> IncrementalValueFactory {
     122           30 :         MetricsKey {
     123           30 :             tenant_id,
     124           30 :             timeline_id: Some(timeline_id),
     125           30 :             metric: Name::WrittenSizeDelta,
     126           30 :         }
     127           30 :         .incremental_values()
     128           30 :     }
     129              : 
     130              :     /// Exact [`Timeline::get_current_logical_size`].
     131              :     ///
     132              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     133           26 :     const fn timeline_logical_size(
     134           26 :         tenant_id: TenantId,
     135           26 :         timeline_id: TimelineId,
     136           26 :     ) -> AbsoluteValueFactory {
     137           26 :         MetricsKey {
     138           26 :             tenant_id,
     139           26 :             timeline_id: Some(timeline_id),
     140           26 :             metric: Name::LogicalSize,
     141           26 :         }
     142           26 :         .absolute_values()
     143           26 :     }
     144              : 
     145              :     /// [`Tenant::remote_size`]
     146              :     ///
     147              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     148           12 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     149           12 :         MetricsKey {
     150           12 :             tenant_id,
     151           12 :             timeline_id: None,
     152           12 :             metric: Name::RemoteSize,
     153           12 :         }
     154           12 :         .absolute_values()
     155           12 :     }
     156              : 
     157              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     158              :     ///
     159              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     160           12 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     161           12 :         MetricsKey {
     162           12 :             tenant_id,
     163           12 :             timeline_id: None,
     164           12 :             metric: Name::ResidentSize,
     165           12 :         }
     166           12 :         .absolute_values()
     167           12 :     }
     168              : 
     169              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     170              :     ///
     171              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     172              :     /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
     173           12 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     174           12 :         MetricsKey {
     175           12 :             tenant_id,
     176           12 :             timeline_id: None,
     177           12 :             metric: Name::SyntheticSize,
     178           12 :         }
     179           12 :         .absolute_values()
     180           12 :     }
     181              : }
     182              : 
     183            0 : pub(super) async fn collect_all_metrics(
     184            0 :     cached_metrics: &Cache,
     185            0 :     ctx: &RequestContext,
     186            0 : ) -> Vec<RawMetric> {
     187            0 :     use pageserver_api::models::TenantState;
     188            0 : 
     189            0 :     let started_at = std::time::Instant::now();
     190              : 
     191            0 :     let tenants = match crate::tenant::mgr::list_tenants().await {
     192            0 :         Ok(tenants) => tenants,
     193            0 :         Err(err) => {
     194            0 :             tracing::error!("failed to list tenants: {:?}", err);
     195            0 :             return vec![];
     196              :         }
     197              :     };
     198              : 
     199            0 :     let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
     200            0 :         if state != TenantState::Active || !id.is_zero() {
     201            0 :             None
     202              :         } else {
     203            0 :             crate::tenant::mgr::get_tenant(id, true)
     204            0 :                 .ok()
     205            0 :                 .map(|tenant| (id.tenant_id, tenant))
     206              :         }
     207            0 :     });
     208              : 
     209            0 :     let res = collect(tenants, cached_metrics, ctx).await;
     210              : 
     211            0 :     tracing::info!(
     212            0 :         elapsed_ms = started_at.elapsed().as_millis(),
     213            0 :         total = res.len(),
     214            0 :         "collected metrics"
     215            0 :     );
     216              : 
     217            0 :     res
     218            0 : }
     219              : 
     220            0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
     221            0 : where
     222            0 :     S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
     223            0 : {
     224            0 :     let mut current_metrics: Vec<RawMetric> = Vec::new();
     225            0 : 
     226            0 :     let mut tenants = std::pin::pin!(tenants);
     227              : 
     228            0 :     while let Some((tenant_id, tenant)) = tenants.next().await {
     229            0 :         let mut tenant_resident_size = 0;
     230              : 
     231            0 :         for timeline in tenant.list_timelines() {
     232            0 :             let timeline_id = timeline.timeline_id;
     233            0 : 
     234            0 :             match TimelineSnapshot::collect(&timeline, ctx) {
     235            0 :                 Ok(Some(snap)) => {
     236            0 :                     snap.to_metrics(
     237            0 :                         tenant_id,
     238            0 :                         timeline_id,
     239            0 :                         Utc::now(),
     240            0 :                         &mut current_metrics,
     241            0 :                         cache,
     242            0 :                     );
     243            0 :                 }
     244            0 :                 Ok(None) => {}
     245            0 :                 Err(e) => {
     246            0 :                     tracing::error!(
     247            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     248            0 :                         timeline.timeline_id
     249            0 :                     );
     250            0 :                     continue;
     251              :                 }
     252              :             }
     253              : 
     254            0 :             tenant_resident_size += timeline.resident_physical_size();
     255              :         }
     256              : 
     257            0 :         let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
     258            0 :         snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
     259              :     }
     260              : 
     261            0 :     current_metrics
     262            0 : }
     263              : 
     264              : /// In-between abstraction to allow testing metrics without actual Tenants.
     265              : struct TenantSnapshot {
     266              :     resident_size: u64,
     267              :     remote_size: u64,
     268              :     synthetic_size: u64,
     269              : }
     270              : 
     271              : impl TenantSnapshot {
     272              :     /// Collect tenant status to have metrics created out of it.
     273              :     ///
     274              :     /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
     275              :     /// cannot just list timelines here.
     276            0 :     fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
     277            0 :         TenantSnapshot {
     278            0 :             resident_size,
     279            0 :             remote_size: t.remote_size(),
     280            0 :             // Note that this metric is calculated in a separate bgworker
     281            0 :             // Here we only use cached value, which may lag behind the real latest one
     282            0 :             synthetic_size: t.cached_synthetic_size(),
     283            0 :         }
     284            0 :     }
     285              : 
     286            4 :     fn to_metrics(
     287            4 :         &self,
     288            4 :         tenant_id: TenantId,
     289            4 :         now: DateTime<Utc>,
     290            4 :         cached: &Cache,
     291            4 :         metrics: &mut Vec<RawMetric>,
     292            4 :     ) {
     293            4 :         let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
     294            4 : 
     295            4 :         let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
     296              : 
     297            4 :         let synthetic_size = {
     298            4 :             let factory = MetricsKey::synthetic_size(tenant_id);
     299            4 :             let mut synthetic_size = self.synthetic_size;
     300            4 : 
     301            4 :             if synthetic_size == 0 {
     302            4 :                 if let Some((_, value)) = cached.get(factory.key()) {
     303            2 :                     // use the latest value from previous session
     304            2 :                     synthetic_size = *value;
     305            2 :                 }
     306            0 :             }
     307              : 
     308            4 :             if synthetic_size != 0 {
     309              :                 // only send non-zeroes because otherwise these show up as errors in logs
     310            2 :                 Some(factory.at(now, synthetic_size))
     311              :             } else {
     312            2 :                 None
     313              :             }
     314              :         };
     315              : 
     316            4 :         metrics.extend(
     317            4 :             [Some(remote_size), Some(resident_size), synthetic_size]
     318            4 :                 .into_iter()
     319            4 :                 .flatten(),
     320            4 :         );
     321            4 :     }
     322              : }
     323              : 
     324              : /// Internal type to make timeline metric production testable.
     325              : ///
     326              : /// As this value type contains all of the information needed from a timeline to produce the
     327              : /// metrics, it can easily be created with different values in test.
     328              : struct TimelineSnapshot {
     329              :     loaded_at: (Lsn, SystemTime),
     330              :     last_record_lsn: Lsn,
     331              :     current_exact_logical_size: Option<u64>,
     332              : }
     333              : 
     334              : impl TimelineSnapshot {
     335              :     /// Collect the metrics from an actual timeline.
     336              :     ///
     337              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     338              :     ///
     339              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     340            0 :     fn collect(
     341            0 :         t: &Arc<crate::tenant::Timeline>,
     342            0 :         ctx: &RequestContext,
     343            0 :     ) -> anyhow::Result<Option<Self>> {
     344            0 :         if !t.is_active() {
     345              :             // no collection for broken or stopping needed, we will still keep the cached values
     346              :             // though at the caller.
     347            0 :             Ok(None)
     348              :         } else {
     349            0 :             let loaded_at = t.loaded_at;
     350            0 :             let last_record_lsn = t.get_last_record_lsn();
     351              : 
     352            0 :             let current_exact_logical_size = {
     353            0 :                 let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
     354            0 :                 let size = span.in_scope(|| {
     355            0 :                     t.get_current_logical_size(
     356            0 :                         crate::tenant::timeline::GetLogicalSizePriority::Background,
     357            0 :                         ctx,
     358            0 :                     )
     359            0 :                 });
     360            0 :                 match size {
     361              :                     // Only send timeline logical size when it is fully calculated.
     362            0 :                     CurrentLogicalSize::Exact(ref size) => Some(size.into()),
     363            0 :                     CurrentLogicalSize::Approximate(_) => None,
     364              :                 }
     365              :             };
     366              : 
     367            0 :             Ok(Some(TimelineSnapshot {
     368            0 :                 loaded_at,
     369            0 :                 last_record_lsn,
     370            0 :                 current_exact_logical_size,
     371            0 :             }))
     372              :         }
     373            0 :     }
     374              : 
     375              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     376           12 :     fn to_metrics(
     377           12 :         &self,
     378           12 :         tenant_id: TenantId,
     379           12 :         timeline_id: TimelineId,
     380           12 :         now: DateTime<Utc>,
     381           12 :         metrics: &mut Vec<RawMetric>,
     382           12 :         cache: &Cache,
     383           12 :     ) {
     384           12 :         let timeline_written_size = u64::from(self.last_record_lsn);
     385           12 : 
     386           12 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     387           12 : 
     388           12 :         let last_stop_time = cache
     389           12 :             .get(written_size_delta_key.key())
     390           12 :             .map(|(until, _val)| {
     391            6 :                 until
     392            6 :                     .incremental_timerange()
     393            6 :                     .expect("never create EventType::Absolute for written_size_delta")
     394            6 :                     .end
     395           12 :             });
     396           12 : 
     397           12 :         let (key, written_size_now) =
     398           12 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     399           12 : 
     400           12 :         // by default, use the last sent written_size as the basis for
     401           12 :         // calculating the delta. if we don't yet have one, use the load time value.
     402           12 :         let prev = cache
     403           12 :             .get(&key)
     404           12 :             .map(|(prev_at, prev)| {
     405            8 :                 // use the prev time from our last incremental update, or default to latest
     406            8 :                 // absolute update on the first round.
     407            8 :                 let prev_at = prev_at
     408            8 :                     .absolute_time()
     409            8 :                     .expect("never create EventType::Incremental for written_size");
     410            8 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     411            8 :                 (*prev_at, *prev)
     412           12 :             })
     413           12 :             .unwrap_or_else(|| {
     414            4 :                 // if we don't have a previous point of comparison, compare to the load time
     415            4 :                 // lsn.
     416            4 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     417            4 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     418           12 :             });
     419           12 : 
     420           12 :         let up_to = now;
     421              : 
     422           12 :         if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     423            8 :             let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
     424            8 :             // written_size_delta
     425            8 :             metrics.push(key_value);
     426            8 :             // written_size
     427            8 :             metrics.push((key, written_size_now));
     428            8 :         } else {
     429            4 :             // the cached value was ahead of us, report zero until we've caught up
     430            4 :             metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
     431            4 :             // the cached value was ahead of us, report the same until we've caught up
     432            4 :             metrics.push((key, (written_size_now.0, prev.1)));
     433            4 :         }
     434              : 
     435              :         {
     436           12 :             let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
     437           12 :             let current_or_previous = self
     438           12 :                 .current_exact_logical_size
     439           12 :                 .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
     440              : 
     441           12 :             if let Some(size) = current_or_previous {
     442            8 :                 metrics.push(factory.at(now, size));
     443            8 :             }
     444              :         }
     445           12 :     }
     446              : }
     447              : 
     448              : #[cfg(test)]
     449              : mod tests;
     450              : 
     451              : #[cfg(test)]
     452              : pub(crate) use tests::metric_examples;
        

Generated by: LCOV version 2.1-beta