LCOV - code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 93.3 % 479 447
Test Date: 2023-09-06 10:18:01 Functions: 75.4 % 61 46

            Line data    Source code
       1              : //!
       2              : //! Periodically collect consumption metrics for all active tenants
       3              : //! and push them to a HTTP endpoint.
       4              : //! Cache metrics to send only the updated ones.
       5              : //!
       6              : use crate::context::{DownloadBehavior, RequestContext};
       7              : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       8              : use crate::tenant::{mgr, LogicalSizeCalculationCause};
       9              : use anyhow;
      10              : use chrono::{DateTime, Utc};
      11              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
      12              : use pageserver_api::models::TenantState;
      13              : use reqwest::Url;
      14              : use serde::Serialize;
      15              : use serde_with::{serde_as, DisplayFromStr};
      16              : use std::collections::HashMap;
      17              : use std::sync::Arc;
      18              : use std::time::{Duration, SystemTime};
      19              : use tracing::*;
      20              : use utils::id::{NodeId, TenantId, TimelineId};
      21              : use utils::lsn::Lsn;
      22              : 
      23              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      24              : 
      25              : #[serde_as]
      26          203 : #[derive(Serialize, Debug, Clone, Copy)]
      27              : struct Ids {
      28              :     #[serde_as(as = "DisplayFromStr")]
      29              :     tenant_id: TenantId,
      30              :     #[serde_as(as = "Option<DisplayFromStr>")]
      31              :     #[serde(skip_serializing_if = "Option::is_none")]
      32              :     timeline_id: Option<TimelineId>,
      33              : }
      34              : 
      35              : /// Key that uniquely identifies the object, this metric describes.
      36          163 : #[derive(Debug, Clone, PartialEq, Eq, Hash)]
      37              : struct MetricsKey {
      38              :     tenant_id: TenantId,
      39              :     timeline_id: Option<TimelineId>,
      40              :     metric: &'static str,
      41              : }
      42              : 
      43              : impl MetricsKey {
      44          212 :     const fn absolute_values(self) -> AbsoluteValueFactory {
      45          212 :         AbsoluteValueFactory(self)
      46          212 :     }
      47           53 :     const fn incremental_values(self) -> IncrementalValueFactory {
      48           53 :         IncrementalValueFactory(self)
      49           53 :     }
      50              : }
      51              : 
      52              : /// Helper type which each individual metric kind can return to produce only absolute values.
      53              : struct AbsoluteValueFactory(MetricsKey);
      54              : 
      55              : impl AbsoluteValueFactory {
      56          212 :     fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
      57          212 :         let key = self.0;
      58          212 :         (key, (EventType::Absolute { time }, val))
      59          212 :     }
      60              : }
      61              : 
      62              : /// Helper type which each individual metric kind can return to produce only incremental values.
      63              : struct IncrementalValueFactory(MetricsKey);
      64              : 
      65              : impl IncrementalValueFactory {
      66              :     #[allow(clippy::wrong_self_convention)]
      67           53 :     fn from_previous_up_to(
      68           53 :         self,
      69           53 :         prev_end: DateTime<Utc>,
      70           53 :         up_to: DateTime<Utc>,
      71           53 :         val: u64,
      72           53 :     ) -> (MetricsKey, (EventType, u64)) {
      73           53 :         let key = self.0;
      74           53 :         // cannot assert prev_end < up_to because these are realtime clock based
      75           53 :         (
      76           53 :             key,
      77           53 :             (
      78           53 :                 EventType::Incremental {
      79           53 :                     start_time: prev_end,
      80           53 :                     stop_time: up_to,
      81           53 :                 },
      82           53 :                 val,
      83           53 :             ),
      84           53 :         )
      85           53 :     }
      86              : 
      87           49 :     fn key(&self) -> &MetricsKey {
      88           49 :         &self.0
      89           49 :     }
      90              : }
      91              : 
      92              : // the static part of a MetricsKey
      93              : impl MetricsKey {
      94              :     /// Absolute value of [`Timeline::get_last_record_lsn`].
      95              :     ///
      96              :     /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
      97           54 :     const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
      98           54 :         MetricsKey {
      99           54 :             tenant_id,
     100           54 :             timeline_id: Some(timeline_id),
     101           54 :             metric: "written_size",
     102           54 :         }
     103           54 :         .absolute_values()
     104           54 :     }
     105              : 
     106              :     /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
     107              :     /// previously sent, starting from the previously sent incremental time range ending at the
     108              :     /// latest absolute measurement.
     109           53 :     const fn written_size_delta(
     110           53 :         tenant_id: TenantId,
     111           53 :         timeline_id: TimelineId,
     112           53 :     ) -> IncrementalValueFactory {
     113           53 :         MetricsKey {
     114           53 :             tenant_id,
     115           53 :             timeline_id: Some(timeline_id),
     116           53 :             // the name here is correctly about data not size, because that is what is wanted by
     117           53 :             // downstream pipeline
     118           53 :             metric: "written_data_bytes_delta",
     119           53 :         }
     120           53 :         .incremental_values()
     121           53 :     }
     122              : 
     123              :     /// Exact [`Timeline::get_current_logical_size`].
     124              :     ///
     125              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     126           52 :     const fn timeline_logical_size(
     127           52 :         tenant_id: TenantId,
     128           52 :         timeline_id: TimelineId,
     129           52 :     ) -> AbsoluteValueFactory {
     130           52 :         MetricsKey {
     131           52 :             tenant_id,
     132           52 :             timeline_id: Some(timeline_id),
     133           52 :             metric: "timeline_logical_size",
     134           52 :         }
     135           52 :         .absolute_values()
     136           52 :     }
     137              : 
     138              :     /// [`Tenant::remote_size`]
     139              :     ///
     140              :     /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
     141           40 :     const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     142           40 :         MetricsKey {
     143           40 :             tenant_id,
     144           40 :             timeline_id: None,
     145           40 :             metric: "remote_storage_size",
     146           40 :         }
     147           40 :         .absolute_values()
     148           40 :     }
     149              : 
     150              :     /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
     151              :     ///
     152              :     /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
     153           40 :     const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     154           40 :         MetricsKey {
     155           40 :             tenant_id,
     156           40 :             timeline_id: None,
     157           40 :             metric: "resident_size",
     158           40 :         }
     159           40 :         .absolute_values()
     160           40 :     }
     161              : 
     162              :     /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
     163              :     ///
     164              :     /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
     165           26 :     const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
     166           26 :         MetricsKey {
     167           26 :             tenant_id,
     168           26 :             timeline_id: None,
     169           26 :             metric: "synthetic_storage_size",
     170           26 :         }
     171           26 :         .absolute_values()
     172           26 :     }
     173              : }
     174              : 
     175              : /// Main thread that serves metrics collection
     176            4 : pub async fn collect_metrics(
     177            4 :     metric_collection_endpoint: &Url,
     178            4 :     metric_collection_interval: Duration,
     179            4 :     cached_metric_collection_interval: Duration,
     180            4 :     synthetic_size_calculation_interval: Duration,
     181            4 :     node_id: NodeId,
     182            4 :     ctx: RequestContext,
     183            4 : ) -> anyhow::Result<()> {
     184            4 :     let mut ticker = tokio::time::interval(metric_collection_interval);
     185            4 :     info!("starting collect_metrics");
     186              : 
     187              :     // spin up background worker that caclulates tenant sizes
     188            4 :     let worker_ctx =
     189            4 :         ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
     190            4 :     task_mgr::spawn(
     191            4 :         BACKGROUND_RUNTIME.handle(),
     192            4 :         TaskKind::CalculateSyntheticSize,
     193            4 :         None,
     194            4 :         None,
     195            4 :         "synthetic size calculation",
     196            4 :         false,
     197            4 :         async move {
     198            4 :             calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
     199            4 :                 .instrument(info_span!("synthetic_size_worker"))
     200           19 :                 .await?;
     201            1 :             Ok(())
     202            4 :         },
     203            4 :     );
     204            4 : 
     205            4 :     // define client here to reuse it for all requests
     206            4 :     let client = reqwest::ClientBuilder::new()
     207            4 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
     208            4 :         .build()
     209            4 :         .expect("Failed to create http client with timeout");
     210            4 :     let mut cached_metrics = HashMap::new();
     211            4 :     let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
     212              : 
     213           47 :     loop {
     214           91 :         tokio::select! {
     215              :             _ = task_mgr::shutdown_watcher() => {
     216            1 :                 info!("collect_metrics received cancellation request");
     217              :                 return Ok(());
     218              :             },
     219           43 :             tick_at = ticker.tick() => {
     220              : 
     221              :                 // send cached metrics every cached_metric_collection_interval
     222              :                 let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
     223              : 
     224              :                 if send_cached {
     225              :                     prev_iteration_time = std::time::Instant::now();
     226              :                 }
     227              : 
     228              :                 collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
     229              : 
     230              :                 crate::tenant::tasks::warn_when_period_overrun(
     231              :                     tick_at.elapsed(),
     232              :                     metric_collection_interval,
     233              :                     "consumption_metrics_collect_metrics",
     234              :                 );
     235              :             }
     236           47 :         }
     237           47 :     }
     238            1 : }
     239              : 
     240              : /// One iteration of metrics collection
     241              : ///
     242              : /// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
     243              : /// Cache metrics to avoid sending the same metrics multiple times.
     244              : ///
     245              : /// This function handles all errors internally
     246              : /// and doesn't break iteration if just one tenant fails.
     247              : ///
     248              : /// TODO
     249              : /// - refactor this function (chunking+sending part) to reuse it in proxy module;
     250           43 : async fn collect_metrics_iteration(
     251           43 :     client: &reqwest::Client,
     252           43 :     cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
     253           43 :     metric_collection_endpoint: &reqwest::Url,
     254           43 :     node_id: NodeId,
     255           43 :     ctx: &RequestContext,
     256           43 :     send_cached: bool,
     257           43 : ) {
     258           43 :     let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
     259            0 :     trace!(
     260            0 :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     261            0 :         metric_collection_endpoint
     262            0 :     );
     263              : 
     264              :     // get list of tenants
     265           43 :     let tenants = match mgr::list_tenants().await {
     266           43 :         Ok(tenants) => tenants,
     267            0 :         Err(err) => {
     268            0 :             error!("failed to list tenants: {:?}", err);
     269            0 :             return;
     270              :         }
     271              :     };
     272              : 
     273              :     // iterate through list of Active tenants and collect metrics
     274           83 :     for (tenant_id, tenant_state) in tenants {
     275           40 :         if tenant_state != TenantState::Active {
     276            0 :             continue;
     277           40 :         }
     278              : 
     279           40 :         let tenant = match mgr::get_tenant(tenant_id, true).await {
     280           40 :             Ok(tenant) => tenant,
     281            0 :             Err(err) => {
     282              :                 // It is possible that tenant was deleted between
     283              :                 // `list_tenants` and `get_tenant`, so just warn about it.
     284            0 :                 warn!("failed to get tenant {tenant_id:?}: {err:?}");
     285            0 :                 continue;
     286              :             }
     287              :         };
     288              : 
     289           40 :         let mut tenant_resident_size = 0;
     290              : 
     291              :         // iterate through list of timelines in tenant
     292           46 :         for timeline in tenant.list_timelines() {
     293              :             // collect per-timeline metrics only for active timelines
     294              : 
     295           46 :             let timeline_id = timeline.timeline_id;
     296           46 : 
     297           46 :             match TimelineSnapshot::collect(&timeline, ctx) {
     298           46 :                 Ok(Some(snap)) => {
     299           46 :                     snap.to_metrics(
     300           46 :                         tenant_id,
     301           46 :                         timeline_id,
     302           46 :                         Utc::now(),
     303           46 :                         &mut current_metrics,
     304           46 :                         cached_metrics,
     305           46 :                     );
     306           46 :                 }
     307            0 :                 Ok(None) => {}
     308            0 :                 Err(e) => {
     309            0 :                     error!(
     310            0 :                         "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
     311            0 :                         timeline.timeline_id
     312            0 :                     );
     313            0 :                     continue;
     314              :                 }
     315              :             }
     316              : 
     317           46 :             tenant_resident_size += timeline.resident_physical_size();
     318              :         }
     319              : 
     320           40 :         current_metrics
     321           40 :             .push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
     322           40 : 
     323           40 :         current_metrics
     324           40 :             .push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
     325           40 : 
     326           40 :         // Note that this metric is calculated in a separate bgworker
     327           40 :         // Here we only use cached value, which may lag behind the real latest one
     328           40 :         let synthetic_size = tenant.cached_synthetic_size();
     329           40 : 
     330           40 :         if synthetic_size != 0 {
     331           26 :             // only send non-zeroes because otherwise these show up as errors in logs
     332           26 :             current_metrics
     333           26 :                 .push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
     334           26 :         }
     335              :     }
     336              : 
     337              :     // Filter metrics, unless we want to send all metrics, including cached ones.
     338              :     // See: https://github.com/neondatabase/neon/issues/3485
     339           43 :     if !send_cached {
     340          244 :         current_metrics.retain(|(curr_key, (kind, curr_val))| {
     341          244 :             if kind.is_incremental() {
     342              :                 // incremental values (currently only written_size_delta) should not get any cache
     343              :                 // deduplication because they will be used by upstream for "is still alive."
     344           46 :                 true
     345              :             } else {
     346          198 :                 match cached_metrics.get(curr_key) {
     347           50 :                     Some((_, val)) => val != curr_val,
     348          148 :                     None => true,
     349              :                 }
     350              :             }
     351          244 :         });
     352           43 :     }
     353              : 
     354           43 :     if current_metrics.is_empty() {
     355            0 :         trace!("no new metrics to send");
     356            3 :         return;
     357           40 :     }
     358           40 : 
     359           40 :     // Send metrics.
     360           40 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     361           40 :     let chunks = current_metrics.chunks(CHUNK_SIZE);
     362           40 : 
     363           40 :     let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
     364           40 : 
     365           40 :     let node_id = node_id.to_string();
     366              : 
     367           80 :     for chunk in chunks {
     368           40 :         chunk_to_send.clear();
     369           40 : 
     370           40 :         // enrich metrics with type,timestamp and idempotency key before sending
     371          203 :         chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
     372          203 :             kind: *when,
     373          203 :             metric: curr_key.metric,
     374          203 :             idempotency_key: idempotency_key(&node_id),
     375          203 :             value: *curr_val,
     376          203 :             extra: Ids {
     377          203 :                 tenant_id: curr_key.tenant_id,
     378          203 :                 timeline_id: curr_key.timeline_id,
     379          203 :             },
     380          203 :         }));
     381              : 
     382              :         const MAX_RETRIES: u32 = 3;
     383              : 
     384           40 :         for attempt in 0..MAX_RETRIES {
     385           40 :             let res = client
     386           40 :                 .post(metric_collection_endpoint.clone())
     387           40 :                 .json(&EventChunk {
     388           40 :                     events: (&chunk_to_send).into(),
     389           40 :                 })
     390           40 :                 .send()
     391          160 :                 .await;
     392              : 
     393            0 :             match res {
     394           40 :                 Ok(res) => {
     395           40 :                     if res.status().is_success() {
     396              :                         // update cached metrics after they were sent successfully
     397           40 :                         for (curr_key, curr_val) in chunk.iter() {
     398           40 :                             cached_metrics.insert(curr_key.clone(), *curr_val);
     399           40 :                         }
     400              :                     } else {
     401           28 :                         error!("metrics endpoint refused the sent metrics: {:?}", res);
     402           28 :                         for metric in chunk_to_send
     403           28 :                             .iter()
     404          163 :                             .filter(|metric| metric.value > (1u64 << 40))
     405              :                         {
     406              :                             // Report if the metric value is suspiciously large
     407            0 :                             error!("potentially abnormal metric value: {:?}", metric);
     408              :                         }
     409              :                     }
     410           40 :                     break;
     411              :                 }
     412            0 :                 Err(err) if err.is_timeout() => {
     413            0 :                     error!(attempt, "timeout sending metrics, retrying immediately");
     414            0 :                     continue;
     415              :                 }
     416            0 :                 Err(err) => {
     417            0 :                     error!(attempt, ?err, "failed to send metrics");
     418            0 :                     break;
     419              :                 }
     420              :             }
     421              :         }
     422              :     }
     423           43 : }
     424              : 
     425              : /// Internal type to make timeline metric production testable.
     426              : ///
     427              : /// As this value type contains all of the information needed from a timeline to produce the
     428              : /// metrics, it can easily be created with different values in test.
     429              : struct TimelineSnapshot {
     430              :     loaded_at: (Lsn, SystemTime),
     431              :     last_record_lsn: Lsn,
     432              :     current_exact_logical_size: Option<u64>,
     433              : }
     434              : 
     435              : impl TimelineSnapshot {
     436              :     /// Collect the metrics from an actual timeline.
     437              :     ///
     438              :     /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
     439              :     ///
     440              :     /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
     441           46 :     fn collect(
     442           46 :         t: &Arc<crate::tenant::Timeline>,
     443           46 :         ctx: &RequestContext,
     444           46 :     ) -> anyhow::Result<Option<Self>> {
     445           46 :         use anyhow::Context;
     446           46 : 
     447           46 :         if !t.is_active() {
     448              :             // no collection for broken or stopping needed, we will still keep the cached values
     449              :             // though at the caller.
     450            0 :             Ok(None)
     451              :         } else {
     452           46 :             let loaded_at = t.loaded_at;
     453           46 :             let last_record_lsn = t.get_last_record_lsn();
     454              : 
     455           46 :             let current_exact_logical_size = {
     456           46 :                 let span = info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
     457           46 :                 let res = span
     458           46 :                     .in_scope(|| t.get_current_logical_size(ctx))
     459           46 :                     .context("get_current_logical_size");
     460           46 :                 match res? {
     461              :                     // Only send timeline logical size when it is fully calculated.
     462           46 :                     (size, is_exact) if is_exact => Some(size),
     463            0 :                     (_, _) => None,
     464              :                 }
     465              :             };
     466              : 
     467           46 :             Ok(Some(TimelineSnapshot {
     468           46 :                 loaded_at,
     469           46 :                 last_record_lsn,
     470           46 :                 current_exact_logical_size,
     471           46 :             }))
     472              :         }
     473           46 :     }
     474              : 
     475              :     /// Produce the timeline consumption metrics into the `metrics` argument.
     476           49 :     fn to_metrics(
     477           49 :         &self,
     478           49 :         tenant_id: TenantId,
     479           49 :         timeline_id: TimelineId,
     480           49 :         now: DateTime<Utc>,
     481           49 :         metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
     482           49 :         cache: &HashMap<MetricsKey, (EventType, u64)>,
     483           49 :     ) {
     484           49 :         let timeline_written_size = u64::from(self.last_record_lsn);
     485           49 : 
     486           49 :         let (key, written_size_now) =
     487           49 :             MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
     488           49 : 
     489           49 :         // last_record_lsn can only go up, right now at least, TODO: #2592 or related
     490           49 :         // features might change this.
     491           49 : 
     492           49 :         let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
     493           49 : 
     494           49 :         // use this when available, because in a stream of incremental values, it will be
     495           49 :         // accurate where as when last_record_lsn stops moving, we will only cache the last
     496           49 :         // one of those.
     497           49 :         let last_stop_time = cache
     498           49 :             .get(written_size_delta_key.key())
     499           49 :             .map(|(until, _val)| {
     500           16 :                 until
     501           16 :                     .incremental_timerange()
     502           16 :                     .expect("never create EventType::Absolute for written_size_delta")
     503           16 :                     .end
     504           49 :             });
     505           49 : 
     506           49 :         // by default, use the last sent written_size as the basis for
     507           49 :         // calculating the delta. if we don't yet have one, use the load time value.
     508           49 :         let prev = cache
     509           49 :             .get(&key)
     510           49 :             .map(|(prev_at, prev)| {
     511           17 :                 // use the prev time from our last incremental update, or default to latest
     512           17 :                 // absolute update on the first round.
     513           17 :                 let prev_at = prev_at
     514           17 :                     .absolute_time()
     515           17 :                     .expect("never create EventType::Incremental for written_size");
     516           17 :                 let prev_at = last_stop_time.unwrap_or(prev_at);
     517           17 :                 (*prev_at, *prev)
     518           49 :             })
     519           49 :             .unwrap_or_else(|| {
     520           32 :                 // if we don't have a previous point of comparison, compare to the load time
     521           32 :                 // lsn.
     522           32 :                 let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
     523           32 :                 (DateTime::from(*loaded_at), disk_consistent_lsn.0)
     524           49 :             });
     525              : 
     526              :         // written_size_bytes_delta
     527              :         metrics.extend(
     528           49 :             if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
     529           49 :                 let up_to = written_size_now
     530           49 :                     .0
     531           49 :                     .absolute_time()
     532           49 :                     .expect("never create EventType::Incremental for written_size");
     533           49 :                 let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
     534           49 :                 Some(key_value)
     535              :             } else {
     536            0 :                 None
     537              :             },
     538              :         );
     539              : 
     540              :         // written_size
     541           49 :         metrics.push((key, written_size_now));
     542              : 
     543           49 :         if let Some(size) = self.current_exact_logical_size {
     544           49 :             metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
     545           49 :         }
     546           49 :     }
     547              : }
     548              : 
     549              : /// Caclculate synthetic size for each active tenant
     550            4 : pub async fn calculate_synthetic_size_worker(
     551            4 :     synthetic_size_calculation_interval: Duration,
     552            4 :     ctx: &RequestContext,
     553            4 : ) -> anyhow::Result<()> {
     554            4 :     info!("starting calculate_synthetic_size_worker");
     555              : 
     556            4 :     let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
     557              : 
     558              :     loop {
     559           21 :         tokio::select! {
     560              :             _ = task_mgr::shutdown_watcher() => {
     561              :                 return Ok(());
     562              :             },
     563           17 :             tick_at = ticker.tick() => {
     564              : 
     565              :                 let tenants = match mgr::list_tenants().await {
     566              :                     Ok(tenants) => tenants,
     567              :                     Err(e) => {
     568            0 :                         warn!("cannot get tenant list: {e:#}");
     569              :                         continue;
     570              :                     }
     571              :                 };
     572              :                 // iterate through list of Active tenants and collect metrics
     573              :                 for (tenant_id, tenant_state) in tenants {
     574              : 
     575              :                     if tenant_state != TenantState::Active {
     576              :                         continue;
     577              :                     }
     578              : 
     579              :                     if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
     580              :                     {
     581              :                         if let Err(e) = tenant.calculate_synthetic_size(
     582              :                             LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
     583              :                             ctx).await {
     584            0 :                             error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
     585              :                         }
     586              :                     }
     587              : 
     588              :                 }
     589              : 
     590              :                 crate::tenant::tasks::warn_when_period_overrun(
     591              :                     tick_at.elapsed(),
     592              :                     synthetic_size_calculation_interval,
     593              :                     "consumption_metrics_synthetic_size_worker",
     594              :                 );
     595              :             }
     596              :         }
     597              :     }
     598            1 : }
     599              : 
     600              : #[cfg(test)]
     601              : mod tests {
     602              :     use std::collections::HashMap;
     603              : 
     604              :     use std::time::SystemTime;
     605              :     use utils::{
     606              :         id::{TenantId, TimelineId},
     607              :         lsn::Lsn,
     608              :     };
     609              : 
     610              :     use crate::consumption_metrics::MetricsKey;
     611              : 
     612              :     use super::TimelineSnapshot;
     613              :     use chrono::{DateTime, Utc};
     614              : 
     615            1 :     #[test]
     616            1 :     fn startup_collected_timeline_metrics_before_advancing() {
     617            1 :         let tenant_id = TenantId::generate();
     618            1 :         let timeline_id = TimelineId::generate();
     619            1 : 
     620            1 :         let mut metrics = Vec::new();
     621            1 :         let cache = HashMap::new();
     622            1 : 
     623            1 :         let initdb_lsn = Lsn(0x10000);
     624            1 :         let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
     625            1 : 
     626            1 :         let snap = TimelineSnapshot {
     627            1 :             loaded_at: (disk_consistent_lsn, SystemTime::now()),
     628            1 :             last_record_lsn: disk_consistent_lsn,
     629            1 :             current_exact_logical_size: Some(0x42000),
     630            1 :         };
     631            1 : 
     632            1 :         let now = DateTime::<Utc>::from(SystemTime::now());
     633            1 : 
     634            1 :         snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
     635            1 : 
     636            1 :         assert_eq!(
     637            1 :             metrics,
     638            1 :             &[
     639            1 :                 MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
     640            1 :                     snap.loaded_at.1.into(),
     641            1 :                     now,
     642            1 :                     0
     643            1 :                 ),
     644            1 :                 MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
     645            1 :                 MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
     646            1 :             ]
     647            1 :         );
     648            1 :     }
     649              : 
     650            1 :     #[test]
     651            1 :     fn startup_collected_timeline_metrics_second_round() {
     652            1 :         let tenant_id = TenantId::generate();
     653            1 :         let timeline_id = TimelineId::generate();
     654            1 : 
     655            1 :         let [now, before, init] = time_backwards();
     656            1 : 
     657            1 :         let now = DateTime::<Utc>::from(now);
     658            1 :         let before = DateTime::<Utc>::from(before);
     659            1 : 
     660            1 :         let initdb_lsn = Lsn(0x10000);
     661            1 :         let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
     662            1 : 
     663            1 :         let mut metrics = Vec::new();
     664            1 :         let cache = HashMap::from([
     665            1 :             MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
     666            1 :         ]);
     667            1 : 
     668            1 :         let snap = TimelineSnapshot {
     669            1 :             loaded_at: (disk_consistent_lsn, init),
     670            1 :             last_record_lsn: disk_consistent_lsn,
     671            1 :             current_exact_logical_size: Some(0x42000),
     672            1 :         };
     673            1 : 
     674            1 :         snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
     675            1 : 
     676            1 :         assert_eq!(
     677            1 :             metrics,
     678            1 :             &[
     679            1 :                 MetricsKey::written_size_delta(tenant_id, timeline_id)
     680            1 :                     .from_previous_up_to(before, now, 0),
     681            1 :                 MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
     682            1 :                 MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
     683            1 :             ]
     684            1 :         );
     685            1 :     }
     686              : 
     687            1 :     #[test]
     688            1 :     fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
     689            1 :         let tenant_id = TenantId::generate();
     690            1 :         let timeline_id = TimelineId::generate();
     691            1 : 
     692            1 :         let [now, just_before, before, init] = time_backwards();
     693            1 : 
     694            1 :         let now = DateTime::<Utc>::from(now);
     695            1 :         let just_before = DateTime::<Utc>::from(just_before);
     696            1 :         let before = DateTime::<Utc>::from(before);
     697            1 : 
     698            1 :         let initdb_lsn = Lsn(0x10000);
     699            1 :         let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
     700            1 : 
     701            1 :         let mut metrics = Vec::new();
     702            1 :         let cache = HashMap::from([
     703            1 :             // at t=before was the last time the last_record_lsn changed
     704            1 :             MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
     705            1 :             // end time of this event is used for the next ones
     706            1 :             MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
     707            1 :                 before,
     708            1 :                 just_before,
     709            1 :                 0,
     710            1 :             ),
     711            1 :         ]);
     712            1 : 
     713            1 :         let snap = TimelineSnapshot {
     714            1 :             loaded_at: (disk_consistent_lsn, init),
     715            1 :             last_record_lsn: disk_consistent_lsn,
     716            1 :             current_exact_logical_size: Some(0x42000),
     717            1 :         };
     718            1 : 
     719            1 :         snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
     720            1 : 
     721            1 :         assert_eq!(
     722            1 :             metrics,
     723            1 :             &[
     724            1 :                 MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
     725            1 :                     just_before,
     726            1 :                     now,
     727            1 :                     0
     728            1 :                 ),
     729            1 :                 MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
     730            1 :                 MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
     731            1 :             ]
     732            1 :         );
     733            1 :     }
     734              : 
     735            2 :     fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
     736            2 :         let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
     737            2 :         times[0] = std::time::SystemTime::now();
     738            7 :         for behind in 1..N {
     739            5 :             times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
     740            5 :         }
     741              : 
     742            2 :         times
     743            2 :     }
     744              : }
        

Generated by: LCOV version 2.1-beta