LCOV - code coverage report
Current view: top level - pageserver/src - consumption_metrics.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 6.5 % 279 18
Test Date: 2025-03-12 18:28:53 Functions: 16.1 % 31 5

            Line data    Source code
       1              : //! Periodically collect consumption metrics for all active tenants
       2              : //! and push them to a HTTP endpoint.
       3              : use std::collections::HashMap;
       4              : use std::sync::Arc;
       5              : use std::time::{Duration, SystemTime};
       6              : 
       7              : use camino::Utf8PathBuf;
       8              : use consumption_metrics::EventType;
       9              : use itertools::Itertools as _;
      10              : use pageserver_api::models::TenantState;
      11              : use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
      12              : use reqwest::Url;
      13              : use serde::{Deserialize, Serialize};
      14              : use tokio::time::Instant;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::id::NodeId;
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::consumption_metrics::metrics::MetricsKey;
      21              : use crate::consumption_metrics::upload::KeyGen as _;
      22              : use crate::context::{DownloadBehavior, RequestContext};
      23              : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
      24              : use crate::tenant::mgr::TenantManager;
      25              : use crate::tenant::size::CalculateSyntheticSizeError;
      26              : use crate::tenant::tasks::BackgroundLoopKind;
      27              : use crate::tenant::{LogicalSizeCalculationCause, Tenant};
      28              : 
      29              : mod disk_cache;
      30              : mod metrics;
      31              : mod upload;
      32              : 
      33              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      34              : 
      35              : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
      36              : ///
      37              : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
      38              : /// Difference is basically the missing idempotency key, which lives only for the duration of
      39              : /// upload attempts.
      40              : type RawMetric = (MetricsKey, (EventType, u64));
      41              : 
      42              : /// The new serializable metrics format
      43            8 : #[derive(Serialize, Deserialize)]
      44              : struct NewMetricsRoot {
      45              :     version: usize,
      46              :     metrics: Vec<NewRawMetric>,
      47              : }
      48              : 
      49              : impl NewMetricsRoot {
      50            8 :     pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
      51            8 :         if let Some(ver) = json_value.get("version") {
      52            4 :             if let Some(2) = ver.as_u64() {
      53            4 :                 return true;
      54            0 :             }
      55            4 :         }
      56            4 :         false
      57            8 :     }
      58              : }
      59              : 
      60              : /// The new serializable metrics format
      61              : #[derive(Serialize)]
      62              : struct NewMetricsRefRoot<'a> {
      63              :     version: usize,
      64              :     metrics: &'a [NewRawMetric],
      65              : }
      66              : 
      67              : impl<'a> NewMetricsRefRoot<'a> {
      68            4 :     fn new(metrics: &'a [NewRawMetric]) -> Self {
      69            4 :         Self {
      70            4 :             version: 2,
      71            4 :             metrics,
      72            4 :         }
      73            4 :     }
      74              : }
      75              : 
      76              : /// The new serializable metrics format
      77           72 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
      78              : struct NewRawMetric {
      79              :     key: MetricsKey,
      80              :     kind: EventType,
      81              :     value: u64,
      82              :     // TODO: add generation field and check against generations
      83              : }
      84              : 
      85              : impl NewRawMetric {
      86              :     #[cfg(test)]
      87           36 :     fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
      88           36 :         (self.key, self.clone())
      89           36 :     }
      90              : }
      91              : 
      92              : /// Caches the [`RawMetric`]s
      93              : ///
      94              : /// In practice, during startup, last sent values are stored here to be used in calculating new
      95              : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
      96              : /// for deduplication, but that is no longer needed.
      97              : type Cache = HashMap<MetricsKey, NewRawMetric>;
      98              : 
      99            0 : pub async fn run(
     100            0 :     conf: &'static PageServerConf,
     101            0 :     tenant_manager: Arc<TenantManager>,
     102            0 :     cancel: CancellationToken,
     103            0 : ) {
     104            0 :     let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
     105            0 :         return;
     106              :     };
     107              : 
     108            0 :     let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
     109            0 : 
     110            0 :     let metrics_ctx = RequestContext::todo_child(
     111            0 :         TaskKind::MetricsCollection,
     112            0 :         // This task itself shouldn't download anything.
     113            0 :         // The actual size calculation does need downloads, and
     114            0 :         // creates a child context with the right DownloadBehavior.
     115            0 :         DownloadBehavior::Error,
     116            0 :     );
     117            0 :     let collect_metrics = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
     118            0 :         "consumption metrics collection",
     119            0 :         collect_metrics(
     120            0 :             tenant_manager.clone(),
     121            0 :             metric_collection_endpoint,
     122            0 :             &conf.metric_collection_bucket,
     123            0 :             conf.metric_collection_interval,
     124            0 :             conf.id,
     125            0 :             local_disk_storage,
     126            0 :             cancel.clone(),
     127            0 :             metrics_ctx,
     128            0 :         )
     129            0 :         .instrument(info_span!("metrics_collection")),
     130              :     ));
     131              : 
     132            0 :     let worker_ctx =
     133            0 :         RequestContext::todo_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
     134            0 :     let synthetic_size_worker = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
     135            0 :         "synthetic size calculation",
     136            0 :         calculate_synthetic_size_worker(
     137            0 :             tenant_manager.clone(),
     138            0 :             conf.synthetic_size_calculation_interval,
     139            0 :             cancel.clone(),
     140            0 :             worker_ctx,
     141            0 :         )
     142            0 :         .instrument(info_span!("synthetic_size_worker")),
     143              :     ));
     144              : 
     145            0 :     let (collect_metrics, synthetic_size_worker) =
     146            0 :         futures::future::join(collect_metrics, synthetic_size_worker).await;
     147            0 :     collect_metrics
     148            0 :         .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
     149            0 :     synthetic_size_worker
     150            0 :         .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
     151            0 : }
     152              : 
     153              : /// Main thread that serves metrics collection
     154              : #[allow(clippy::too_many_arguments)]
     155            0 : async fn collect_metrics(
     156            0 :     tenant_manager: Arc<TenantManager>,
     157            0 :     metric_collection_endpoint: &Url,
     158            0 :     metric_collection_bucket: &Option<RemoteStorageConfig>,
     159            0 :     metric_collection_interval: Duration,
     160            0 :     node_id: NodeId,
     161            0 :     local_disk_storage: Utf8PathBuf,
     162            0 :     cancel: CancellationToken,
     163            0 :     ctx: RequestContext,
     164            0 : ) -> anyhow::Result<()> {
     165            0 :     let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
     166            0 : 
     167            0 :     let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
     168              : 
     169            0 :     let mut cached_metrics = tokio::select! {
     170            0 :         _ = cancel.cancelled() => return Ok(()),
     171            0 :         ret = restore_and_reschedule => ret,
     172            0 :     };
     173            0 : 
     174            0 :     // define client here to reuse it for all requests
     175            0 :     let client = reqwest::ClientBuilder::new()
     176            0 :         .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
     177            0 :         .build()
     178            0 :         .expect("Failed to create http client with timeout");
     179              : 
     180            0 :     let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
     181            0 :         match GenericRemoteStorage::from_config(bucket_config).await {
     182            0 :             Ok(client) => Some(client),
     183            0 :             Err(e) => {
     184            0 :                 // Non-fatal error: if we were given an invalid config, we will proceed
     185            0 :                 // with sending metrics over the network, but not to S3.
     186            0 :                 tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
     187            0 :                 None
     188              :             }
     189              :         }
     190              :     } else {
     191            0 :         None
     192              :     };
     193              : 
     194            0 :     let node_id = node_id.to_string();
     195              : 
     196              :     loop {
     197            0 :         let started_at = Instant::now();
     198              : 
     199              :         // these are point in time, with variable "now"
     200            0 :         let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
     201              : 
     202              :         // Pre-generate event idempotency keys, to reuse them across the bucket
     203              :         // and HTTP sinks.
     204            0 :         let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate())
     205            0 :             .take(metrics.len())
     206            0 :             .collect_vec();
     207            0 : 
     208            0 :         let metrics = Arc::new(metrics);
     209            0 : 
     210            0 :         // why not race cancellation here? because we are one of the last tasks, and if we are
     211            0 :         // already here, better to try to flush the new values.
     212            0 : 
     213            0 :         let flush = async {
     214            0 :             match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
     215              :                 Ok(()) => {
     216            0 :                     tracing::debug!("flushed metrics to disk");
     217              :                 }
     218            0 :                 Err(e) => {
     219            0 :                     // idea here is that if someone creates a directory as our path, then they
     220            0 :                     // might notice it from the logs before shutdown and remove it
     221            0 :                     tracing::error!("failed to persist metrics to {path:?}: {e:#}");
     222              :                 }
     223              :             }
     224              : 
     225            0 :             if let Some(bucket_client) = &bucket_client {
     226            0 :                 let res = upload::upload_metrics_bucket(
     227            0 :                     bucket_client,
     228            0 :                     &cancel,
     229            0 :                     &node_id,
     230            0 :                     &metrics,
     231            0 :                     &idempotency_keys,
     232            0 :                 )
     233            0 :                 .await;
     234            0 :                 if let Err(e) = res {
     235            0 :                     tracing::error!("failed to upload to remote storage: {e:#}");
     236            0 :                 }
     237            0 :             }
     238            0 :         };
     239              : 
     240            0 :         let upload = async {
     241            0 :             let res = upload::upload_metrics_http(
     242            0 :                 &client,
     243            0 :                 metric_collection_endpoint,
     244            0 :                 &cancel,
     245            0 :                 &metrics,
     246            0 :                 &mut cached_metrics,
     247            0 :                 &idempotency_keys,
     248            0 :             )
     249            0 :             .await;
     250            0 :             if let Err(e) = res {
     251              :                 // serialization error which should never happen
     252            0 :                 tracing::error!("failed to upload via HTTP due to {e:#}");
     253            0 :             }
     254            0 :         };
     255              : 
     256              :         // let these run concurrently
     257            0 :         let (_, _) = tokio::join!(flush, upload);
     258              : 
     259            0 :         crate::tenant::tasks::warn_when_period_overrun(
     260            0 :             started_at.elapsed(),
     261            0 :             metric_collection_interval,
     262            0 :             BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
     263            0 :         );
     264              : 
     265            0 :         let res =
     266            0 :             tokio::time::timeout_at(started_at + metric_collection_interval, cancel.cancelled())
     267            0 :                 .await;
     268            0 :         if res.is_ok() {
     269            0 :             return Ok(());
     270            0 :         }
     271              :     }
     272            0 : }
     273              : 
     274              : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
     275              : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
     276              : ///
     277              : /// Cancellation safe.
     278            0 : async fn restore_and_reschedule(
     279            0 :     path: &Arc<Utf8PathBuf>,
     280            0 :     metric_collection_interval: Duration,
     281            0 : ) -> Cache {
     282            0 :     let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
     283            0 :         Ok(found_some) => {
     284            0 :             // there is no min needed because we write these sequentially in
     285            0 :             // collect_all_metrics
     286            0 :             let earlier_metric_at = found_some
     287            0 :                 .iter()
     288            0 :                 .map(|item| item.kind.recorded_at())
     289            0 :                 .copied()
     290            0 :                 .next();
     291            0 : 
     292            0 :             let cached = found_some
     293            0 :                 .into_iter()
     294            0 :                 .map(|item| (item.key, item))
     295            0 :                 .collect::<Cache>();
     296            0 : 
     297            0 :             (cached, earlier_metric_at)
     298              :         }
     299            0 :         Err(e) => {
     300              :             use std::io::{Error, ErrorKind};
     301              : 
     302            0 :             let root = e.root_cause();
     303            0 :             let maybe_ioerr = root.downcast_ref::<Error>();
     304            0 :             let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
     305            0 : 
     306            0 :             if !is_not_found {
     307            0 :                 tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
     308            0 :             }
     309              : 
     310            0 :             (HashMap::new(), None)
     311              :         }
     312              :     };
     313              : 
     314            0 :     if let Some(earlier_metric_at) = earlier_metric_at {
     315            0 :         let earlier_metric_at: SystemTime = earlier_metric_at.into();
     316              : 
     317            0 :         let error = reschedule(earlier_metric_at, metric_collection_interval).await;
     318              : 
     319            0 :         if let Some(error) = error {
     320            0 :             if error.as_secs() >= 60 {
     321            0 :                 tracing::info!(
     322            0 :                     error_ms = error.as_millis(),
     323            0 :                     "startup scheduling error due to restart"
     324              :                 )
     325            0 :             }
     326            0 :         }
     327            0 :     }
     328              : 
     329            0 :     cached
     330            0 : }
     331              : 
     332            0 : async fn reschedule(
     333            0 :     earlier_metric_at: SystemTime,
     334            0 :     metric_collection_interval: Duration,
     335            0 : ) -> Option<Duration> {
     336            0 :     let now = SystemTime::now();
     337            0 :     match now.duration_since(earlier_metric_at) {
     338            0 :         Ok(from_last_send) if from_last_send < metric_collection_interval => {
     339            0 :             let sleep_for = metric_collection_interval - from_last_send;
     340            0 : 
     341            0 :             let deadline = std::time::Instant::now() + sleep_for;
     342            0 : 
     343            0 :             tokio::time::sleep_until(deadline.into()).await;
     344              : 
     345            0 :             let now = std::time::Instant::now();
     346            0 : 
     347            0 :             // executor threads might be busy, add extra measurements
     348            0 :             Some(if now < deadline {
     349            0 :                 deadline - now
     350              :             } else {
     351            0 :                 now - deadline
     352              :             })
     353              :         }
     354            0 :         Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
     355              :         Err(_) => {
     356            0 :             tracing::warn!(
     357              :                 ?now,
     358              :                 ?earlier_metric_at,
     359            0 :                 "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
     360              :             );
     361            0 :             earlier_metric_at.duration_since(now).ok()
     362              :         }
     363              :     }
     364            0 : }
     365              : 
     366              : /// Caclculate synthetic size for each active tenant
     367            0 : async fn calculate_synthetic_size_worker(
     368            0 :     tenant_manager: Arc<TenantManager>,
     369            0 :     synthetic_size_calculation_interval: Duration,
     370            0 :     cancel: CancellationToken,
     371            0 :     ctx: RequestContext,
     372            0 : ) -> anyhow::Result<()> {
     373            0 :     info!("starting calculate_synthetic_size_worker");
     374            0 :     scopeguard::defer! {
     375            0 :         info!("calculate_synthetic_size_worker stopped");
     376            0 :     };
     377              : 
     378              :     loop {
     379            0 :         let started_at = Instant::now();
     380              : 
     381            0 :         let tenants = match tenant_manager.list_tenants() {
     382            0 :             Ok(tenants) => tenants,
     383            0 :             Err(e) => {
     384            0 :                 warn!("cannot get tenant list: {e:#}");
     385            0 :                 continue;
     386              :             }
     387              :         };
     388              : 
     389            0 :         for (tenant_shard_id, tenant_state, _gen) in tenants {
     390            0 :             if tenant_state != TenantState::Active {
     391            0 :                 continue;
     392            0 :             }
     393            0 : 
     394            0 :             if !tenant_shard_id.is_shard_zero() {
     395              :                 // We only send consumption metrics from shard 0, so don't waste time calculating
     396              :                 // synthetic size on other shards.
     397            0 :                 continue;
     398            0 :             }
     399              : 
     400            0 :             let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
     401            0 :                 continue;
     402              :             };
     403              : 
     404            0 :             if !tenant.is_active() {
     405            0 :                 continue;
     406            0 :             }
     407            0 : 
     408            0 :             // there is never any reason to exit calculate_synthetic_size_worker following any
     409            0 :             // return value -- we don't need to care about shutdown because no tenant is found when
     410            0 :             // pageserver is shut down.
     411            0 :             calculate_and_log(&tenant, &cancel, &ctx).await;
     412              :         }
     413              : 
     414            0 :         crate::tenant::tasks::warn_when_period_overrun(
     415            0 :             started_at.elapsed(),
     416            0 :             synthetic_size_calculation_interval,
     417            0 :             BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
     418            0 :         );
     419              : 
     420            0 :         let res = tokio::time::timeout_at(
     421            0 :             started_at + synthetic_size_calculation_interval,
     422            0 :             cancel.cancelled(),
     423            0 :         )
     424            0 :         .await;
     425            0 :         if res.is_ok() {
     426            0 :             return Ok(());
     427            0 :         }
     428              :     }
     429            0 : }
     430              : 
     431            0 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
     432              :     const CAUSE: LogicalSizeCalculationCause =
     433              :         LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
     434              : 
     435              :     // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
     436              :     // We can put in some prioritization for consumption metrics.
     437              :     // Same for the loop that fetches computed metrics.
     438              :     // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
     439              :     // which turns out is really handy to understand the system.
     440            0 :     match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
     441            0 :         Ok(_) => {}
     442            0 :         Err(CalculateSyntheticSizeError::Cancelled) => {}
     443            0 :         Err(e) => {
     444            0 :             let tenant_shard_id = tenant.tenant_shard_id();
     445            0 :             error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
     446              :         }
     447              :     }
     448            0 : }
        

Generated by: LCOV version 2.1-beta