LCOV - differential code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 82.1 % 212 174 38 174
Current Date: 2024-01-09 02:06:09 Functions: 72.4 % 29 21 8 21
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Periodically collect consumption metrics for all active tenants
       2                 : //! and push them to a HTTP endpoint.
       3                 : use crate::context::{DownloadBehavior, RequestContext};
       4                 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       5                 : use crate::tenant::tasks::BackgroundLoopKind;
       6                 : use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
       7                 : use camino::Utf8PathBuf;
       8                 : use consumption_metrics::EventType;
       9                 : use pageserver_api::models::TenantState;
      10                 : use reqwest::Url;
      11                 : use std::collections::HashMap;
      12                 : use std::sync::Arc;
      13                 : use std::time::{Duration, SystemTime};
      14                 : use tokio::time::Instant;
      15                 : use tokio_util::sync::CancellationToken;
      16                 : use tracing::*;
      17                 : use utils::id::NodeId;
      18                 : 
      19                 : mod metrics;
      20                 : use metrics::MetricsKey;
      21                 : mod disk_cache;
      22                 : mod upload;
      23                 : 
      24                 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      25                 : 
      26                 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
      27                 : ///
      28                 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
      29                 : /// Difference is basically the missing idempotency key, which lives only for the duration of
      30                 : /// upload attempts.
      31                 : type RawMetric = (MetricsKey, (EventType, u64));
      32                 : 
      33                 : /// Caches the [`RawMetric`]s
      34                 : ///
      35                 : /// In practice, during startup, last sent values are stored here to be used in calculating new
      36                 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
      37                 : /// for deduplication, but that is no longer needed.
      38                 : type Cache = HashMap<MetricsKey, (EventType, u64)>;
      39                 : 
      40                 : /// Main thread that serves metrics collection
      41                 : #[allow(clippy::too_many_arguments)]
      42 CBC           6 : pub async fn collect_metrics(
      43               6 :     metric_collection_endpoint: &Url,
      44               6 :     metric_collection_interval: Duration,
      45               6 :     _cached_metric_collection_interval: Duration,
      46               6 :     synthetic_size_calculation_interval: Duration,
      47               6 :     node_id: NodeId,
      48               6 :     local_disk_storage: Utf8PathBuf,
      49               6 :     cancel: CancellationToken,
      50               6 :     ctx: RequestContext,
      51               6 : ) -> anyhow::Result<()> {
      52               6 :     if _cached_metric_collection_interval != Duration::ZERO {
      53 UBC           0 :         tracing::warn!(
      54               0 :             "cached_metric_collection_interval is no longer used, please set it to zero."
      55               0 :         )
      56 CBC           6 :     }
      57                 : 
      58                 :     // spin up background worker that caclulates tenant sizes
      59               6 :     let worker_ctx =
      60               6 :         ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
      61               6 :     task_mgr::spawn(
      62               6 :         BACKGROUND_RUNTIME.handle(),
      63               6 :         TaskKind::CalculateSyntheticSize,
      64               6 :         None,
      65               6 :         None,
      66               6 :         "synthetic size calculation",
      67               6 :         false,
      68               6 :         async move {
      69               6 :             calculate_synthetic_size_worker(
      70               6 :                 synthetic_size_calculation_interval,
      71               6 :                 &cancel,
      72               6 :                 &worker_ctx,
      73               6 :             )
      74               6 :             .instrument(info_span!("synthetic_size_worker"))
      75              35 :             .await?;
      76               3 :             Ok(())
      77               6 :         },
      78               6 :     );
      79               6 : 
      80               6 :     let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
      81               6 : 
      82               6 :     let cancel = task_mgr::shutdown_token();
      83               6 : 
      84               6 :     let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
      85                 : 
      86               6 :     let mut cached_metrics = tokio::select! {
      87                 :         _ = cancel.cancelled() => return Ok(()),
      88               6 :         ret = restore_and_reschedule => ret,
      89                 :     };
      90                 : 
      91                 :     // define client here to reuse it for all requests
      92               6 :     let client = reqwest::ClientBuilder::new()
      93               6 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
      94               6 :         .build()
      95               6 :         .expect("Failed to create http client with timeout");
      96               6 : 
      97               6 :     let node_id = node_id.to_string();
      98                 : 
      99              30 :     loop {
     100              30 :         let started_at = Instant::now();
     101                 : 
     102                 :         // these are point in time, with variable "now"
     103              30 :         let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
     104                 : 
     105              30 :         let metrics = Arc::new(metrics);
     106              30 : 
     107              30 :         // why not race cancellation here? because we are one of the last tasks, and if we are
     108              30 :         // already here, better to try to flush the new values.
     109              30 : 
     110              30 :         let flush = async {
     111              88 :             match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
     112                 :                 Ok(()) => {
     113              29 :                     tracing::debug!("flushed metrics to disk");
     114                 :                 }
     115 UBC           0 :                 Err(e) => {
     116               0 :                     // idea here is that if someone creates a directory as our path, then they
     117               0 :                     // might notice it from the logs before shutdown and remove it
     118               0 :                     tracing::error!("failed to persist metrics to {path:?}: {e:#}");
     119                 :                 }
     120                 :             }
     121 CBC          29 :         };
     122                 : 
     123              30 :         let upload = async {
     124              30 :             let res = upload::upload_metrics(
     125              30 :                 &client,
     126              30 :                 metric_collection_endpoint,
     127              30 :                 &cancel,
     128              30 :                 &node_id,
     129              30 :                 &metrics,
     130              30 :                 &mut cached_metrics,
     131              30 :             )
     132             259 :             .await;
     133              29 :             if let Err(e) = res {
     134                 :                 // serialization error which should never happen
     135 UBC           0 :                 tracing::error!("failed to upload due to {e:#}");
     136 CBC          29 :             }
     137              29 :         };
     138                 : 
     139                 :         // let these run concurrently
     140             297 :         let (_, _) = tokio::join!(flush, upload);
     141                 : 
     142              28 :         crate::tenant::tasks::warn_when_period_overrun(
     143              28 :             started_at.elapsed(),
     144              28 :             metric_collection_interval,
     145              28 :             BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
     146              28 :         );
     147                 : 
     148              28 :         let res = tokio::time::timeout_at(
     149              28 :             started_at + metric_collection_interval,
     150              28 :             task_mgr::shutdown_token().cancelled(),
     151              28 :         )
     152              24 :         .await;
     153              27 :         if res.is_ok() {
     154               3 :             return Ok(());
     155              24 :         }
     156                 :     }
     157               3 : }
     158                 : 
     159                 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
     160                 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
     161                 : ///
     162                 : /// Cancellation safe.
     163               6 : async fn restore_and_reschedule(
     164               6 :     path: &Arc<Utf8PathBuf>,
     165               6 :     metric_collection_interval: Duration,
     166               6 : ) -> Cache {
     167               6 :     let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
     168               3 :         Ok(found_some) => {
     169               3 :             // there is no min needed because we write these sequentially in
     170               3 :             // collect_all_metrics
     171               3 :             let earlier_metric_at = found_some
     172               3 :                 .iter()
     173               3 :                 .map(|(_, (et, _))| et.recorded_at())
     174               3 :                 .copied()
     175               3 :                 .next();
     176               3 : 
     177               3 :             let cached = found_some.into_iter().collect::<Cache>();
     178               3 : 
     179               3 :             (cached, earlier_metric_at)
     180                 :         }
     181               3 :         Err(e) => {
     182               3 :             use std::io::{Error, ErrorKind};
     183               3 : 
     184               3 :             let root = e.root_cause();
     185               3 :             let maybe_ioerr = root.downcast_ref::<Error>();
     186               3 :             let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
     187               3 : 
     188               3 :             if !is_not_found {
     189 UBC           0 :                 tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
     190 CBC           3 :             }
     191                 : 
     192               3 :             (HashMap::new(), None)
     193                 :         }
     194                 :     };
     195                 : 
     196               6 :     if let Some(earlier_metric_at) = earlier_metric_at {
     197               3 :         let earlier_metric_at: SystemTime = earlier_metric_at.into();
     198                 : 
     199               3 :         let error = reschedule(earlier_metric_at, metric_collection_interval).await;
     200                 : 
     201               3 :         if let Some(error) = error {
     202               3 :             if error.as_secs() >= 60 {
     203 UBC           0 :                 tracing::info!(
     204               0 :                     error_ms = error.as_millis(),
     205               0 :                     "startup scheduling error due to restart"
     206               0 :                 )
     207 CBC           3 :             }
     208 UBC           0 :         }
     209 CBC           3 :     }
     210                 : 
     211               6 :     cached
     212               6 : }
     213                 : 
     214               3 : async fn reschedule(
     215               3 :     earlier_metric_at: SystemTime,
     216               3 :     metric_collection_interval: Duration,
     217               3 : ) -> Option<Duration> {
     218               3 :     let now = SystemTime::now();
     219               3 :     match now.duration_since(earlier_metric_at) {
     220               3 :         Ok(from_last_send) if from_last_send < metric_collection_interval => {
     221 UBC           0 :             let sleep_for = metric_collection_interval - from_last_send;
     222               0 : 
     223               0 :             let deadline = std::time::Instant::now() + sleep_for;
     224               0 : 
     225               0 :             tokio::time::sleep_until(deadline.into()).await;
     226                 : 
     227               0 :             let now = std::time::Instant::now();
     228               0 : 
     229               0 :             // executor threads might be busy, add extra measurements
     230               0 :             Some(if now < deadline {
     231               0 :                 deadline - now
     232                 :             } else {
     233               0 :                 now - deadline
     234                 :             })
     235                 :         }
     236 CBC           3 :         Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
     237                 :         Err(_) => {
     238 UBC           0 :             tracing::warn!(
     239               0 :                 ?now,
     240               0 :                 ?earlier_metric_at,
     241               0 :                 "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
     242               0 :             );
     243               0 :             earlier_metric_at.duration_since(now).ok()
     244                 :         }
     245                 :     }
     246 CBC           3 : }
     247                 : 
     248                 : /// Caclculate synthetic size for each active tenant
     249               6 : async fn calculate_synthetic_size_worker(
     250               6 :     synthetic_size_calculation_interval: Duration,
     251               6 :     cancel: &CancellationToken,
     252               6 :     ctx: &RequestContext,
     253               6 : ) -> anyhow::Result<()> {
     254               6 :     info!("starting calculate_synthetic_size_worker");
     255               3 :     scopeguard::defer! {
     256               3 :         info!("calculate_synthetic_size_worker stopped");
     257                 :     };
     258                 : 
     259              32 :     loop {
     260              32 :         let started_at = Instant::now();
     261                 : 
     262              32 :         let tenants = match mgr::list_tenants().await {
     263              32 :             Ok(tenants) => tenants,
     264 UBC           0 :             Err(e) => {
     265               0 :                 warn!("cannot get tenant list: {e:#}");
     266               0 :                 continue;
     267                 :             }
     268                 :         };
     269                 : 
     270 CBC          61 :         for (tenant_shard_id, tenant_state) in tenants {
     271              29 :             if tenant_state != TenantState::Active {
     272 UBC           0 :                 continue;
     273 CBC          29 :             }
     274              29 : 
     275              29 :             if !tenant_shard_id.is_zero() {
     276                 :                 // We only send consumption metrics from shard 0, so don't waste time calculating
     277                 :                 // synthetic size on other shards.
     278 UBC           0 :                 continue;
     279 CBC          29 :             }
     280                 : 
     281              29 :             let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) else {
     282 UBC           0 :                 continue;
     283                 :             };
     284                 : 
     285                 :             // there is never any reason to exit calculate_synthetic_size_worker following any
     286                 :             // return value -- we don't need to care about shutdown because no tenant is found when
     287                 :             // pageserver is shut down.
     288 CBC          29 :             calculate_and_log(&tenant, cancel, ctx).await;
     289                 :         }
     290                 : 
     291              32 :         crate::tenant::tasks::warn_when_period_overrun(
     292              32 :             started_at.elapsed(),
     293              32 :             synthetic_size_calculation_interval,
     294              32 :             BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
     295              32 :         );
     296                 : 
     297              32 :         let res = tokio::time::timeout_at(
     298              32 :             started_at + synthetic_size_calculation_interval,
     299              32 :             cancel.cancelled(),
     300              32 :         )
     301              29 :         .await;
     302              29 :         if res.is_ok() {
     303               3 :             return Ok(());
     304              26 :         }
     305                 :     }
     306               3 : }
     307                 : 
     308              29 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
     309                 :     const CAUSE: LogicalSizeCalculationCause =
     310                 :         LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
     311                 : 
     312                 :     // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
     313                 :     // We can put in some prioritization for consumption metrics.
     314                 :     // Same for the loop that fetches computed metrics.
     315                 :     // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
     316                 :     // which turns out is really handy to understand the system.
     317              29 :     let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
     318              27 :         return;
     319                 :     };
     320                 : 
     321                 :     // this error can be returned if timeline is shutting down, but it does not
     322                 :     // mean the synthetic size worker should terminate. we do not need any checks
     323                 :     // in this function because `mgr::get_tenant` will error out after shutdown has
     324                 :     // progressed to shutting down tenants.
     325               2 :     let shutting_down = matches!(
     326               2 :         e.downcast_ref::<PageReconstructError>(),
     327                 :         Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     328                 :     );
     329                 : 
     330               2 :     if !shutting_down {
     331               2 :         let tenant_shard_id = tenant.tenant_shard_id();
     332               2 :         error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
     333 UBC           0 :     }
     334 CBC          29 : }
        

Generated by: LCOV version 2.1-beta