LCOV - differential code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 80.0 % 185 148 37 148
Current Date: 2023-10-19 02:04:12 Functions: 63.0 % 27 17 10 17
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Periodically collect consumption metrics for all active tenants
       2                 : //! and push them to a HTTP endpoint.
       3                 : use crate::context::{DownloadBehavior, RequestContext};
       4                 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       5                 : use crate::tenant::tasks::BackgroundLoopKind;
       6                 : use crate::tenant::{mgr, LogicalSizeCalculationCause};
       7                 : use camino::Utf8PathBuf;
       8                 : use consumption_metrics::EventType;
       9                 : use pageserver_api::models::TenantState;
      10                 : use reqwest::Url;
      11                 : use std::collections::HashMap;
      12                 : use std::sync::Arc;
      13                 : use std::time::{Duration, SystemTime};
      14                 : use tracing::*;
      15                 : use utils::id::NodeId;
      16                 : 
      17                 : mod metrics;
      18                 : use metrics::MetricsKey;
      19                 : mod disk_cache;
      20                 : mod upload;
      21                 : 
      22                 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      23                 : 
      24                 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
      25                 : ///
      26                 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
      27                 : /// Difference is basically the missing idempotency key, which lives only for the duration of
      28                 : /// upload attempts.
      29                 : type RawMetric = (MetricsKey, (EventType, u64));
      30                 : 
      31                 : /// Caches the [`RawMetric`]s
      32                 : ///
      33                 : /// In practice, during startup, last sent values are stored here to be used in calculating new
      34                 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
      35                 : /// for deduplication, but that is no longer needed.
      36                 : type Cache = HashMap<MetricsKey, (EventType, u64)>;
      37                 : 
      38                 : /// Main thread that serves metrics collection
      39 CBC           6 : pub async fn collect_metrics(
      40               6 :     metric_collection_endpoint: &Url,
      41               6 :     metric_collection_interval: Duration,
      42               6 :     _cached_metric_collection_interval: Duration,
      43               6 :     synthetic_size_calculation_interval: Duration,
      44               6 :     node_id: NodeId,
      45               6 :     local_disk_storage: Utf8PathBuf,
      46               6 :     ctx: RequestContext,
      47               6 : ) -> anyhow::Result<()> {
      48               6 :     if _cached_metric_collection_interval != Duration::ZERO {
      49 UBC           0 :         tracing::warn!(
      50               0 :             "cached_metric_collection_interval is no longer used, please set it to zero."
      51               0 :         )
      52 CBC           6 :     }
      53                 : 
      54                 :     // spin up background worker that caclulates tenant sizes
      55               6 :     let worker_ctx =
      56               6 :         ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
      57               6 :     task_mgr::spawn(
      58               6 :         BACKGROUND_RUNTIME.handle(),
      59               6 :         TaskKind::CalculateSyntheticSize,
      60               6 :         None,
      61               6 :         None,
      62               6 :         "synthetic size calculation",
      63               6 :         false,
      64               6 :         async move {
      65               6 :             calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
      66               6 :                 .instrument(info_span!("synthetic_size_worker"))
      67              31 :                 .await?;
      68               3 :             Ok(())
      69               6 :         },
      70               6 :     );
      71               6 : 
      72               6 :     let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
      73               6 : 
      74               6 :     let cancel = task_mgr::shutdown_token();
      75               6 : 
      76               6 :     let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
      77                 : 
      78               6 :     let mut cached_metrics = tokio::select! {
      79                 :         _ = cancel.cancelled() => return Ok(()),
      80               6 :         ret = restore_and_reschedule => ret,
      81                 :     };
      82                 : 
      83                 :     // define client here to reuse it for all requests
      84               6 :     let client = reqwest::ClientBuilder::new()
      85               6 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
      86               6 :         .build()
      87               6 :         .expect("Failed to create http client with timeout");
      88               6 : 
      89               6 :     let node_id = node_id.to_string();
      90               6 : 
      91               6 :     // reminder: ticker is ready immediatedly
      92               6 :     let mut ticker = tokio::time::interval(metric_collection_interval);
      93                 : 
      94                 :     loop {
      95              22 :         let tick_at = tokio::select! {
      96                 :             _ = cancel.cancelled() => return Ok(()),
      97              18 :             tick_at = ticker.tick() => tick_at,
      98                 :         };
      99                 : 
     100                 :         // these are point in time, with variable "now"
     101              18 :         let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
     102                 : 
     103              18 :         if metrics.is_empty() {
     104               3 :             continue;
     105              15 :         }
     106              15 : 
     107              15 :         let metrics = Arc::new(metrics);
     108              15 : 
     109              15 :         // why not race cancellation here? because we are one of the last tasks, and if we are
     110              15 :         // already here, better to try to flush the new values.
     111              15 : 
     112              15 :         let flush = async {
     113              69 :             match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
     114                 :                 Ok(()) => {
     115              14 :                     tracing::debug!("flushed metrics to disk");
     116                 :                 }
     117 UBC           0 :                 Err(e) => {
     118               0 :                     // idea here is that if someone creates a directory as our path, then they
     119               0 :                     // might notice it from the logs before shutdown and remove it
     120               0 :                     tracing::error!("failed to persist metrics to {path:?}: {e:#}");
     121                 :                 }
     122                 :             }
     123 CBC          14 :         };
     124                 : 
     125              15 :         let upload = async {
     126              15 :             let res = upload::upload_metrics(
     127              15 :                 &client,
     128              15 :                 metric_collection_endpoint,
     129              15 :                 &cancel,
     130              15 :                 &node_id,
     131              15 :                 &metrics,
     132              15 :                 &mut cached_metrics,
     133              15 :             )
     134             163 :             .await;
     135              14 :             if let Err(e) = res {
     136                 :                 // serialization error which should never happen
     137 UBC           0 :                 tracing::error!("failed to upload due to {e:#}");
     138 CBC          14 :             }
     139              14 :         };
     140                 : 
     141                 :         // let these run concurrently
     142              15 :         let (_, _) = tokio::join!(flush, upload);
     143                 : 
     144              13 :         crate::tenant::tasks::warn_when_period_overrun(
     145              13 :             tick_at.elapsed(),
     146              13 :             metric_collection_interval,
     147              13 :             BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
     148              13 :         );
     149                 :     }
     150               3 : }
     151                 : 
     152                 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
     153                 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
     154                 : ///
     155                 : /// Cancellation safe.
     156               6 : async fn restore_and_reschedule(
     157               6 :     path: &Arc<Utf8PathBuf>,
     158               6 :     metric_collection_interval: Duration,
     159               6 : ) -> Cache {
     160               6 :     let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
     161               3 :         Ok(found_some) => {
     162               3 :             // there is no min needed because we write these sequentially in
     163               3 :             // collect_all_metrics
     164               3 :             let earlier_metric_at = found_some
     165               3 :                 .iter()
     166               3 :                 .map(|(_, (et, _))| et.recorded_at())
     167               3 :                 .copied()
     168               3 :                 .next();
     169               3 : 
     170               3 :             let cached = found_some.into_iter().collect::<Cache>();
     171               3 : 
     172               3 :             (cached, earlier_metric_at)
     173                 :         }
     174               3 :         Err(e) => {
     175               3 :             use std::io::{Error, ErrorKind};
     176               3 : 
     177               3 :             let root = e.root_cause();
     178               3 :             let maybe_ioerr = root.downcast_ref::<Error>();
     179               3 :             let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
     180               3 : 
     181               3 :             if !is_not_found {
     182 UBC           0 :                 tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
     183 CBC           3 :             }
     184                 : 
     185               3 :             (HashMap::new(), None)
     186                 :         }
     187                 :     };
     188                 : 
     189               6 :     if let Some(earlier_metric_at) = earlier_metric_at {
     190               3 :         let earlier_metric_at: SystemTime = earlier_metric_at.into();
     191                 : 
     192               3 :         let error = reschedule(earlier_metric_at, metric_collection_interval).await;
     193                 : 
     194               3 :         if let Some(error) = error {
     195               3 :             if error.as_secs() >= 60 {
     196 UBC           0 :                 tracing::info!(
     197               0 :                     error_ms = error.as_millis(),
     198               0 :                     "startup scheduling error due to restart"
     199               0 :                 )
     200 CBC           3 :             }
     201 UBC           0 :         }
     202 CBC           3 :     }
     203                 : 
     204               6 :     cached
     205               6 : }
     206                 : 
     207               3 : async fn reschedule(
     208               3 :     earlier_metric_at: SystemTime,
     209               3 :     metric_collection_interval: Duration,
     210               3 : ) -> Option<Duration> {
     211               3 :     let now = SystemTime::now();
     212               3 :     match now.duration_since(earlier_metric_at) {
     213               3 :         Ok(from_last_send) if from_last_send < metric_collection_interval => {
     214 UBC           0 :             let sleep_for = metric_collection_interval - from_last_send;
     215               0 : 
     216               0 :             let deadline = std::time::Instant::now() + sleep_for;
     217               0 : 
     218               0 :             tokio::time::sleep_until(deadline.into()).await;
     219                 : 
     220               0 :             let now = std::time::Instant::now();
     221               0 : 
     222               0 :             // executor threads might be busy, add extra measurements
     223               0 :             Some(if now < deadline {
     224               0 :                 deadline - now
     225                 :             } else {
     226               0 :                 now - deadline
     227                 :             })
     228                 :         }
     229 CBC           3 :         Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
     230                 :         Err(_) => {
     231 UBC           0 :             tracing::warn!(
     232               0 :                 ?now,
     233               0 :                 ?earlier_metric_at,
     234               0 :                 "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
     235               0 :             );
     236               0 :             earlier_metric_at.duration_since(now).ok()
     237                 :         }
     238                 :     }
     239 CBC           3 : }
     240                 : 
     241                 : /// Caclculate synthetic size for each active tenant
     242               6 : async fn calculate_synthetic_size_worker(
     243               6 :     synthetic_size_calculation_interval: Duration,
     244               6 :     ctx: &RequestContext,
     245               6 : ) -> anyhow::Result<()> {
     246               6 :     info!("starting calculate_synthetic_size_worker");
     247                 : 
     248                 :     // reminder: ticker is ready immediatedly
     249               6 :     let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
     250               6 :     let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
     251                 : 
     252                 :     loop {
     253              28 :         let tick_at = tokio::select! {
     254                 :             _ = task_mgr::shutdown_watcher() => return Ok(()),
     255              22 :             tick_at = ticker.tick() => tick_at,
     256                 :         };
     257                 : 
     258              22 :         let tenants = match mgr::list_tenants().await {
     259              22 :             Ok(tenants) => tenants,
     260 UBC           0 :             Err(e) => {
     261               0 :                 warn!("cannot get tenant list: {e:#}");
     262               0 :                 continue;
     263                 :             }
     264                 :         };
     265                 : 
     266 CBC          41 :         for (tenant_id, tenant_state) in tenants {
     267              19 :             if tenant_state != TenantState::Active {
     268 UBC           0 :                 continue;
     269 CBC          19 :             }
     270                 : 
     271              19 :             if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
     272                 :                 // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
     273                 :                 // We can put in some prioritization for consumption metrics.
     274                 :                 // Same for the loop that fetches computed metrics.
     275                 :                 // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
     276                 :                 // which turns out is really handy to understand the system.
     277              19 :                 if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
     278 UBC           0 :                     error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
     279 CBC          19 :                 }
     280 UBC           0 :             }
     281                 :         }
     282                 : 
     283 CBC          22 :         crate::tenant::tasks::warn_when_period_overrun(
     284              22 :             tick_at.elapsed(),
     285              22 :             synthetic_size_calculation_interval,
     286              22 :             BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
     287              22 :         );
     288                 :     }
     289               3 : }
        

Generated by: LCOV version 2.1-beta