LCOV - code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 232 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 15 0

            Line data    Source code
       1              : //! Periodically collect consumption metrics for all active tenants
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::context::{DownloadBehavior, RequestContext};
       4              : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       5              : use crate::tenant::tasks::BackgroundLoopKind;
       6              : use crate::tenant::{
       7              :     mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
       8              : };
       9              : use camino::Utf8PathBuf;
      10              : use consumption_metrics::EventType;
      11              : use pageserver_api::models::TenantState;
      12              : use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
      13              : use reqwest::Url;
      14              : use std::collections::HashMap;
      15              : use std::sync::Arc;
      16              : use std::time::{Duration, SystemTime};
      17              : use tokio::time::Instant;
      18              : use tokio_util::sync::CancellationToken;
      19              : use tracing::*;
      20              : use utils::id::NodeId;
      21              : 
      22              : mod metrics;
      23              : use crate::consumption_metrics::metrics::MetricsKey;
      24              : mod disk_cache;
      25              : mod upload;
      26              : 
      27              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      28              : 
      29              : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
      30              : ///
      31              : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
      32              : /// Difference is basically the missing idempotency key, which lives only for the duration of
      33              : /// upload attempts.
      34              : type RawMetric = (MetricsKey, (EventType, u64));
      35              : 
      36              : /// Caches the [`RawMetric`]s
      37              : ///
      38              : /// In practice, during startup, last sent values are stored here to be used in calculating new
      39              : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
      40              : /// for deduplication, but that is no longer needed.
      41              : type Cache = HashMap<MetricsKey, (EventType, u64)>;
      42              : 
      43              : /// Main thread that serves metrics collection
      44              : #[allow(clippy::too_many_arguments)]
      45            0 : pub async fn collect_metrics(
      46            0 :     tenant_manager: Arc<TenantManager>,
      47            0 :     metric_collection_endpoint: &Url,
      48            0 :     metric_collection_bucket: &Option<RemoteStorageConfig>,
      49            0 :     metric_collection_interval: Duration,
      50            0 :     _cached_metric_collection_interval: Duration,
      51            0 :     synthetic_size_calculation_interval: Duration,
      52            0 :     node_id: NodeId,
      53            0 :     local_disk_storage: Utf8PathBuf,
      54            0 :     cancel: CancellationToken,
      55            0 :     ctx: RequestContext,
      56            0 : ) -> anyhow::Result<()> {
      57            0 :     if _cached_metric_collection_interval != Duration::ZERO {
      58            0 :         tracing::warn!(
      59            0 :             "cached_metric_collection_interval is no longer used, please set it to zero."
      60              :         )
      61            0 :     }
      62              : 
      63              :     // spin up background worker that caclulates tenant sizes
      64            0 :     let worker_ctx =
      65            0 :         ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
      66            0 :     task_mgr::spawn(
      67            0 :         BACKGROUND_RUNTIME.handle(),
      68            0 :         TaskKind::CalculateSyntheticSize,
      69            0 :         None,
      70            0 :         None,
      71            0 :         "synthetic size calculation",
      72            0 :         false,
      73            0 :         {
      74            0 :             let tenant_manager = tenant_manager.clone();
      75            0 :             async move {
      76            0 :                 calculate_synthetic_size_worker(
      77            0 :                     tenant_manager,
      78            0 :                     synthetic_size_calculation_interval,
      79            0 :                     &cancel,
      80            0 :                     &worker_ctx,
      81            0 :                 )
      82            0 :                 .instrument(info_span!("synthetic_size_worker"))
      83            0 :                 .await?;
      84            0 :                 Ok(())
      85            0 :             }
      86            0 :         },
      87            0 :     );
      88            0 : 
      89            0 :     let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
      90            0 : 
      91            0 :     let cancel = task_mgr::shutdown_token();
      92            0 : 
      93            0 :     let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
      94              : 
      95            0 :     let mut cached_metrics = tokio::select! {
      96              :         _ = cancel.cancelled() => return Ok(()),
      97              :         ret = restore_and_reschedule => ret,
      98              :     };
      99              : 
     100              :     // define client here to reuse it for all requests
     101            0 :     let client = reqwest::ClientBuilder::new()
     102            0 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
     103            0 :         .build()
     104            0 :         .expect("Failed to create http client with timeout");
     105              : 
     106            0 :     let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
     107            0 :         match GenericRemoteStorage::from_config(bucket_config) {
     108            0 :             Ok(client) => Some(client),
     109            0 :             Err(e) => {
     110            0 :                 // Non-fatal error: if we were given an invalid config, we will proceed
     111            0 :                 // with sending metrics over the network, but not to S3.
     112            0 :                 tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
     113            0 :                 None
     114              :             }
     115              :         }
     116              :     } else {
     117            0 :         None
     118              :     };
     119              : 
     120            0 :     let node_id = node_id.to_string();
     121              : 
     122            0 :     loop {
     123            0 :         let started_at = Instant::now();
     124              : 
     125              :         // these are point in time, with variable "now"
     126            0 :         let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
     127              : 
     128            0 :         let metrics = Arc::new(metrics);
     129            0 : 
     130            0 :         // why not race cancellation here? because we are one of the last tasks, and if we are
     131            0 :         // already here, better to try to flush the new values.
     132            0 : 
     133            0 :         let flush = async {
     134            0 :             match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
     135              :                 Ok(()) => {
     136            0 :                     tracing::debug!("flushed metrics to disk");
     137              :                 }
     138            0 :                 Err(e) => {
     139            0 :                     // idea here is that if someone creates a directory as our path, then they
     140            0 :                     // might notice it from the logs before shutdown and remove it
     141            0 :                     tracing::error!("failed to persist metrics to {path:?}: {e:#}");
     142              :                 }
     143              :             }
     144              : 
     145            0 :             if let Some(bucket_client) = &bucket_client {
     146            0 :                 let res =
     147            0 :                     upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
     148            0 :                 if let Err(e) = res {
     149            0 :                     tracing::error!("failed to upload to S3: {e:#}");
     150            0 :                 }
     151            0 :             }
     152            0 :         };
     153              : 
     154            0 :         let upload = async {
     155            0 :             let res = upload::upload_metrics_http(
     156            0 :                 &client,
     157            0 :                 metric_collection_endpoint,
     158            0 :                 &cancel,
     159            0 :                 &node_id,
     160            0 :                 &metrics,
     161            0 :                 &mut cached_metrics,
     162            0 :             )
     163            0 :             .await;
     164            0 :             if let Err(e) = res {
     165              :                 // serialization error which should never happen
     166            0 :                 tracing::error!("failed to upload via HTTP due to {e:#}");
     167            0 :             }
     168            0 :         };
     169              : 
     170              :         // let these run concurrently
     171              :         let (_, _) = tokio::join!(flush, upload);
     172              : 
     173            0 :         crate::tenant::tasks::warn_when_period_overrun(
     174            0 :             started_at.elapsed(),
     175            0 :             metric_collection_interval,
     176            0 :             BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
     177            0 :         );
     178              : 
     179            0 :         let res = tokio::time::timeout_at(
     180            0 :             started_at + metric_collection_interval,
     181            0 :             task_mgr::shutdown_token().cancelled(),
     182            0 :         )
     183            0 :         .await;
     184            0 :         if res.is_ok() {
     185            0 :             return Ok(());
     186            0 :         }
     187              :     }
     188            0 : }
     189              : 
     190              : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
     191              : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
     192              : ///
     193              : /// Cancellation safe.
     194            0 : async fn restore_and_reschedule(
     195            0 :     path: &Arc<Utf8PathBuf>,
     196            0 :     metric_collection_interval: Duration,
     197            0 : ) -> Cache {
     198            0 :     let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
     199            0 :         Ok(found_some) => {
     200            0 :             // there is no min needed because we write these sequentially in
     201            0 :             // collect_all_metrics
     202            0 :             let earlier_metric_at = found_some
     203            0 :                 .iter()
     204            0 :                 .map(|(_, (et, _))| et.recorded_at())
     205            0 :                 .copied()
     206            0 :                 .next();
     207            0 : 
     208            0 :             let cached = found_some.into_iter().collect::<Cache>();
     209            0 : 
     210            0 :             (cached, earlier_metric_at)
     211              :         }
     212            0 :         Err(e) => {
     213            0 :             use std::io::{Error, ErrorKind};
     214            0 : 
     215            0 :             let root = e.root_cause();
     216            0 :             let maybe_ioerr = root.downcast_ref::<Error>();
     217            0 :             let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
     218            0 : 
     219            0 :             if !is_not_found {
     220            0 :                 tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
     221            0 :             }
     222              : 
     223            0 :             (HashMap::new(), None)
     224              :         }
     225              :     };
     226              : 
     227            0 :     if let Some(earlier_metric_at) = earlier_metric_at {
     228            0 :         let earlier_metric_at: SystemTime = earlier_metric_at.into();
     229              : 
     230            0 :         let error = reschedule(earlier_metric_at, metric_collection_interval).await;
     231              : 
     232            0 :         if let Some(error) = error {
     233            0 :             if error.as_secs() >= 60 {
     234            0 :                 tracing::info!(
     235            0 :                     error_ms = error.as_millis(),
     236            0 :                     "startup scheduling error due to restart"
     237              :                 )
     238            0 :             }
     239            0 :         }
     240            0 :     }
     241              : 
     242            0 :     cached
     243            0 : }
     244              : 
     245            0 : async fn reschedule(
     246            0 :     earlier_metric_at: SystemTime,
     247            0 :     metric_collection_interval: Duration,
     248            0 : ) -> Option<Duration> {
     249            0 :     let now = SystemTime::now();
     250            0 :     match now.duration_since(earlier_metric_at) {
     251            0 :         Ok(from_last_send) if from_last_send < metric_collection_interval => {
     252            0 :             let sleep_for = metric_collection_interval - from_last_send;
     253            0 : 
     254            0 :             let deadline = std::time::Instant::now() + sleep_for;
     255            0 : 
     256            0 :             tokio::time::sleep_until(deadline.into()).await;
     257              : 
     258            0 :             let now = std::time::Instant::now();
     259            0 : 
     260            0 :             // executor threads might be busy, add extra measurements
     261            0 :             Some(if now < deadline {
     262            0 :                 deadline - now
     263              :             } else {
     264            0 :                 now - deadline
     265              :             })
     266              :         }
     267            0 :         Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
     268              :         Err(_) => {
     269            0 :             tracing::warn!(
     270              :                 ?now,
     271              :                 ?earlier_metric_at,
     272            0 :                 "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
     273              :             );
     274            0 :             earlier_metric_at.duration_since(now).ok()
     275              :         }
     276              :     }
     277            0 : }
     278              : 
     279              : /// Caclculate synthetic size for each active tenant
     280            0 : async fn calculate_synthetic_size_worker(
     281            0 :     tenant_manager: Arc<TenantManager>,
     282            0 :     synthetic_size_calculation_interval: Duration,
     283            0 :     cancel: &CancellationToken,
     284            0 :     ctx: &RequestContext,
     285            0 : ) -> anyhow::Result<()> {
     286            0 :     info!("starting calculate_synthetic_size_worker");
     287              :     scopeguard::defer! {
     288              :         info!("calculate_synthetic_size_worker stopped");
     289              :     };
     290              : 
     291            0 :     loop {
     292            0 :         let started_at = Instant::now();
     293              : 
     294            0 :         let tenants = match tenant_manager.list_tenants() {
     295            0 :             Ok(tenants) => tenants,
     296            0 :             Err(e) => {
     297            0 :                 warn!("cannot get tenant list: {e:#}");
     298            0 :                 continue;
     299              :             }
     300              :         };
     301              : 
     302            0 :         for (tenant_shard_id, tenant_state, _gen) in tenants {
     303            0 :             if tenant_state != TenantState::Active {
     304            0 :                 continue;
     305            0 :             }
     306            0 : 
     307            0 :             if !tenant_shard_id.is_shard_zero() {
     308              :                 // We only send consumption metrics from shard 0, so don't waste time calculating
     309              :                 // synthetic size on other shards.
     310            0 :                 continue;
     311            0 :             }
     312              : 
     313            0 :             let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
     314            0 :                 continue;
     315              :             };
     316              : 
     317            0 :             if !tenant.is_active() {
     318            0 :                 continue;
     319            0 :             }
     320            0 : 
     321            0 :             // there is never any reason to exit calculate_synthetic_size_worker following any
     322            0 :             // return value -- we don't need to care about shutdown because no tenant is found when
     323            0 :             // pageserver is shut down.
     324            0 :             calculate_and_log(&tenant, cancel, ctx).await;
     325              :         }
     326              : 
     327            0 :         crate::tenant::tasks::warn_when_period_overrun(
     328            0 :             started_at.elapsed(),
     329            0 :             synthetic_size_calculation_interval,
     330            0 :             BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
     331            0 :         );
     332              : 
     333            0 :         let res = tokio::time::timeout_at(
     334            0 :             started_at + synthetic_size_calculation_interval,
     335            0 :             cancel.cancelled(),
     336            0 :         )
     337            0 :         .await;
     338            0 :         if res.is_ok() {
     339            0 :             return Ok(());
     340            0 :         }
     341              :     }
     342            0 : }
     343              : 
     344            0 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
     345              :     const CAUSE: LogicalSizeCalculationCause =
     346              :         LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
     347              : 
     348              :     // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
     349              :     // We can put in some prioritization for consumption metrics.
     350              :     // Same for the loop that fetches computed metrics.
     351              :     // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
     352              :     // which turns out is really handy to understand the system.
     353            0 :     let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
     354            0 :         return;
     355              :     };
     356              : 
     357              :     // this error can be returned if timeline is shutting down, but it does not
     358              :     // mean the synthetic size worker should terminate.
     359            0 :     let shutting_down = matches!(
     360            0 :         e.downcast_ref::<PageReconstructError>(),
     361              :         Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     362              :     );
     363              : 
     364            0 :     if !shutting_down {
     365            0 :         let tenant_shard_id = tenant.tenant_shard_id();
     366            0 :         error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
     367            0 :     }
     368            0 : }
        

Generated by: LCOV version 2.1-beta