LCOV - code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 79.7 % 212 169
Test Date: 2024-02-12 20:26:03 Functions: 69.0 % 29 20

            Line data    Source code
       1              : //! Periodically collect consumption metrics for all active tenants
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::context::{DownloadBehavior, RequestContext};
       4              : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       5              : use crate::tenant::tasks::BackgroundLoopKind;
       6              : use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
       7              : use camino::Utf8PathBuf;
       8              : use consumption_metrics::EventType;
       9              : use pageserver_api::models::TenantState;
      10              : use reqwest::Url;
      11              : use std::collections::HashMap;
      12              : use std::sync::Arc;
      13              : use std::time::{Duration, SystemTime};
      14              : use tokio::time::Instant;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::id::NodeId;
      18              : 
      19              : mod metrics;
      20              : use metrics::MetricsKey;
      21              : mod disk_cache;
      22              : mod upload;
      23              : 
      24              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      25              : 
      26              : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
      27              : ///
      28              : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
      29              : /// Difference is basically the missing idempotency key, which lives only for the duration of
      30              : /// upload attempts.
      31              : type RawMetric = (MetricsKey, (EventType, u64));
      32              : 
      33              : /// Caches the [`RawMetric`]s
      34              : ///
      35              : /// In practice, during startup, last sent values are stored here to be used in calculating new
      36              : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
      37              : /// for deduplication, but that is no longer needed.
      38              : type Cache = HashMap<MetricsKey, (EventType, u64)>;
      39              : 
      40              : /// Main thread that serves metrics collection
      41              : #[allow(clippy::too_many_arguments)]
      42            6 : pub async fn collect_metrics(
      43            6 :     metric_collection_endpoint: &Url,
      44            6 :     metric_collection_interval: Duration,
      45            6 :     _cached_metric_collection_interval: Duration,
      46            6 :     synthetic_size_calculation_interval: Duration,
      47            6 :     node_id: NodeId,
      48            6 :     local_disk_storage: Utf8PathBuf,
      49            6 :     cancel: CancellationToken,
      50            6 :     ctx: RequestContext,
      51            6 : ) -> anyhow::Result<()> {
      52            6 :     if _cached_metric_collection_interval != Duration::ZERO {
      53            0 :         tracing::warn!(
      54            0 :             "cached_metric_collection_interval is no longer used, please set it to zero."
      55            0 :         )
      56            6 :     }
      57              : 
      58              :     // spin up background worker that caclulates tenant sizes
      59            6 :     let worker_ctx =
      60            6 :         ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
      61            6 :     task_mgr::spawn(
      62            6 :         BACKGROUND_RUNTIME.handle(),
      63            6 :         TaskKind::CalculateSyntheticSize,
      64            6 :         None,
      65            6 :         None,
      66            6 :         "synthetic size calculation",
      67            6 :         false,
      68            6 :         async move {
      69            6 :             calculate_synthetic_size_worker(
      70            6 :                 synthetic_size_calculation_interval,
      71            6 :                 &cancel,
      72            6 :                 &worker_ctx,
      73            6 :             )
      74            6 :             .instrument(info_span!("synthetic_size_worker"))
      75           26 :             .await?;
      76            3 :             Ok(())
      77            6 :         },
      78            6 :     );
      79            6 : 
      80            6 :     let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
      81            6 : 
      82            6 :     let cancel = task_mgr::shutdown_token();
      83            6 : 
      84            6 :     let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
      85              : 
      86            6 :     let mut cached_metrics = tokio::select! {
      87              :         _ = cancel.cancelled() => return Ok(()),
      88            6 :         ret = restore_and_reschedule => ret,
      89              :     };
      90              : 
      91              :     // define client here to reuse it for all requests
      92            6 :     let client = reqwest::ClientBuilder::new()
      93            6 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
      94            6 :         .build()
      95            6 :         .expect("Failed to create http client with timeout");
      96            6 : 
      97            6 :     let node_id = node_id.to_string();
      98              : 
      99           22 :     loop {
     100           22 :         let started_at = Instant::now();
     101              : 
     102              :         // these are point in time, with variable "now"
     103           22 :         let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
     104              : 
     105           22 :         let metrics = Arc::new(metrics);
     106           22 : 
     107           22 :         // why not race cancellation here? because we are one of the last tasks, and if we are
     108           22 :         // already here, better to try to flush the new values.
     109           22 : 
     110           22 :         let flush = async {
     111           68 :             match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
     112              :                 Ok(()) => {
     113           21 :                     tracing::debug!("flushed metrics to disk");
     114              :                 }
     115            0 :                 Err(e) => {
     116            0 :                     // idea here is that if someone creates a directory as our path, then they
     117            0 :                     // might notice it from the logs before shutdown and remove it
     118            0 :                     tracing::error!("failed to persist metrics to {path:?}: {e:#}");
     119              :                 }
     120              :             }
     121           21 :         };
     122              : 
     123           22 :         let upload = async {
     124           22 :             let res = upload::upload_metrics(
     125           22 :                 &client,
     126           22 :                 metric_collection_endpoint,
     127           22 :                 &cancel,
     128           22 :                 &node_id,
     129           22 :                 &metrics,
     130           22 :                 &mut cached_metrics,
     131           22 :             )
     132          186 :             .await;
     133           21 :             if let Err(e) = res {
     134              :                 // serialization error which should never happen
     135            0 :                 tracing::error!("failed to upload due to {e:#}");
     136           21 :             }
     137           21 :         };
     138              : 
     139              :         // let these run concurrently
     140          214 :         let (_, _) = tokio::join!(flush, upload);
     141              : 
     142           20 :         crate::tenant::tasks::warn_when_period_overrun(
     143           20 :             started_at.elapsed(),
     144           20 :             metric_collection_interval,
     145           20 :             BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
     146           20 :         );
     147              : 
     148           20 :         let res = tokio::time::timeout_at(
     149           20 :             started_at + metric_collection_interval,
     150           20 :             task_mgr::shutdown_token().cancelled(),
     151           20 :         )
     152           17 :         .await;
     153           19 :         if res.is_ok() {
     154            3 :             return Ok(());
     155           16 :         }
     156              :     }
     157            3 : }
     158              : 
     159              : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
     160              : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
     161              : ///
     162              : /// Cancellation safe.
     163            6 : async fn restore_and_reschedule(
     164            6 :     path: &Arc<Utf8PathBuf>,
     165            6 :     metric_collection_interval: Duration,
     166            6 : ) -> Cache {
     167            6 :     let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
     168            3 :         Ok(found_some) => {
     169            3 :             // there is no min needed because we write these sequentially in
     170            3 :             // collect_all_metrics
     171            3 :             let earlier_metric_at = found_some
     172            3 :                 .iter()
     173            3 :                 .map(|(_, (et, _))| et.recorded_at())
     174            3 :                 .copied()
     175            3 :                 .next();
     176            3 : 
     177            3 :             let cached = found_some.into_iter().collect::<Cache>();
     178            3 : 
     179            3 :             (cached, earlier_metric_at)
     180              :         }
     181            3 :         Err(e) => {
     182            3 :             use std::io::{Error, ErrorKind};
     183            3 : 
     184            3 :             let root = e.root_cause();
     185            3 :             let maybe_ioerr = root.downcast_ref::<Error>();
     186            3 :             let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
     187            3 : 
     188            3 :             if !is_not_found {
     189            0 :                 tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
     190            3 :             }
     191              : 
     192            3 :             (HashMap::new(), None)
     193              :         }
     194              :     };
     195              : 
     196            6 :     if let Some(earlier_metric_at) = earlier_metric_at {
     197            3 :         let earlier_metric_at: SystemTime = earlier_metric_at.into();
     198              : 
     199            3 :         let error = reschedule(earlier_metric_at, metric_collection_interval).await;
     200              : 
     201            3 :         if let Some(error) = error {
     202            3 :             if error.as_secs() >= 60 {
     203            0 :                 tracing::info!(
     204            0 :                     error_ms = error.as_millis(),
     205            0 :                     "startup scheduling error due to restart"
     206            0 :                 )
     207            3 :             }
     208            0 :         }
     209            3 :     }
     210              : 
     211            6 :     cached
     212            6 : }
     213              : 
     214            3 : async fn reschedule(
     215            3 :     earlier_metric_at: SystemTime,
     216            3 :     metric_collection_interval: Duration,
     217            3 : ) -> Option<Duration> {
     218            3 :     let now = SystemTime::now();
     219            3 :     match now.duration_since(earlier_metric_at) {
     220            3 :         Ok(from_last_send) if from_last_send < metric_collection_interval => {
     221            0 :             let sleep_for = metric_collection_interval - from_last_send;
     222            0 : 
     223            0 :             let deadline = std::time::Instant::now() + sleep_for;
     224            0 : 
     225            0 :             tokio::time::sleep_until(deadline.into()).await;
     226              : 
     227            0 :             let now = std::time::Instant::now();
     228            0 : 
     229            0 :             // executor threads might be busy, add extra measurements
     230            0 :             Some(if now < deadline {
     231            0 :                 deadline - now
     232              :             } else {
     233            0 :                 now - deadline
     234              :             })
     235              :         }
     236            3 :         Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
     237              :         Err(_) => {
     238            0 :             tracing::warn!(
     239            0 :                 ?now,
     240            0 :                 ?earlier_metric_at,
     241            0 :                 "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
     242            0 :             );
     243            0 :             earlier_metric_at.duration_since(now).ok()
     244              :         }
     245              :     }
     246            3 : }
     247              : 
     248              : /// Caclculate synthetic size for each active tenant
     249            6 : async fn calculate_synthetic_size_worker(
     250            6 :     synthetic_size_calculation_interval: Duration,
     251            6 :     cancel: &CancellationToken,
     252            6 :     ctx: &RequestContext,
     253            6 : ) -> anyhow::Result<()> {
     254            6 :     info!("starting calculate_synthetic_size_worker");
     255            3 :     scopeguard::defer! {
     256            3 :         info!("calculate_synthetic_size_worker stopped");
     257              :     };
     258              : 
     259           25 :     loop {
     260           25 :         let started_at = Instant::now();
     261              : 
     262           25 :         let tenants = match mgr::list_tenants().await {
     263           25 :             Ok(tenants) => tenants,
     264            0 :             Err(e) => {
     265            0 :                 warn!("cannot get tenant list: {e:#}");
     266            0 :                 continue;
     267              :             }
     268              :         };
     269              : 
     270           46 :         for (tenant_shard_id, tenant_state, _gen) in tenants {
     271           22 :             if tenant_state != TenantState::Active {
     272            0 :                 continue;
     273           22 :             }
     274           22 : 
     275           22 :             if !tenant_shard_id.is_zero() {
     276              :                 // We only send consumption metrics from shard 0, so don't waste time calculating
     277              :                 // synthetic size on other shards.
     278            0 :                 continue;
     279           22 :             }
     280              : 
     281           22 :             let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) else {
     282            0 :                 continue;
     283              :             };
     284              : 
     285              :             // there is never any reason to exit calculate_synthetic_size_worker following any
     286              :             // return value -- we don't need to care about shutdown because no tenant is found when
     287              :             // pageserver is shut down.
     288           22 :             calculate_and_log(&tenant, cancel, ctx).await;
     289              :         }
     290              : 
     291           24 :         crate::tenant::tasks::warn_when_period_overrun(
     292           24 :             started_at.elapsed(),
     293           24 :             synthetic_size_calculation_interval,
     294           24 :             BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
     295           24 :         );
     296              : 
     297           24 :         let res = tokio::time::timeout_at(
     298           24 :             started_at + synthetic_size_calculation_interval,
     299           24 :             cancel.cancelled(),
     300           24 :         )
     301           22 :         .await;
     302           22 :         if res.is_ok() {
     303            3 :             return Ok(());
     304           19 :         }
     305              :     }
     306            3 : }
     307              : 
     308           22 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
     309              :     const CAUSE: LogicalSizeCalculationCause =
     310              :         LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
     311              : 
     312              :     // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
     313              :     // We can put in some prioritization for consumption metrics.
     314              :     // Same for the loop that fetches computed metrics.
     315              :     // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
     316              :     // which turns out is really handy to understand the system.
     317           22 :     let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
     318           21 :         return;
     319              :     };
     320              : 
     321              :     // this error can be returned if timeline is shutting down, but it does not
     322              :     // mean the synthetic size worker should terminate. we do not need any checks
     323              :     // in this function because `mgr::get_tenant` will error out after shutdown has
     324              :     // progressed to shutting down tenants.
     325            0 :     let shutting_down = matches!(
     326            0 :         e.downcast_ref::<PageReconstructError>(),
     327              :         Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     328              :     );
     329              : 
     330            0 :     if !shutting_down {
     331            0 :         let tenant_shard_id = tenant.tenant_shard_id();
     332            0 :         error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
     333            0 :     }
     334           21 : }
        

Generated by: LCOV version 2.1-beta