LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - eviction_task.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 74.1 % 158 117 41 117
Current Date: 2023-10-19 02:04:12 Functions: 60.5 % 38 23 15 23
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
       2                 : //! than a given threshold.
       3                 : //!
       4                 : //! Data includes all kinds of caches, namely:
       5                 : //! - (in-memory layers)
       6                 : //! - on-demand downloaded layer files on disk
       7                 : //! - (cached layer file pages)
       8                 : //! - derived data from layer file contents, namely:
       9                 : //!     - initial logical size
      10                 : //!     - partitioning
      11                 : //!     - (other currently missing unknowns)
      12                 : //!
      13                 : //! Items with parentheses are not (yet) touched by this task.
      14                 : //!
      15                 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
      16                 : use std::{
      17                 :     collections::HashMap,
      18                 :     ops::ControlFlow,
      19                 :     sync::Arc,
      20                 :     time::{Duration, SystemTime},
      21                 : };
      22                 : 
      23                 : use tokio::time::Instant;
      24                 : use tokio_util::sync::CancellationToken;
      25                 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
      26                 : 
      27                 : use crate::{
      28                 :     context::{DownloadBehavior, RequestContext},
      29                 :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      30                 :     tenant::{
      31                 :         config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
      32                 :         storage_layer::PersistentLayer,
      33                 :         tasks::{BackgroundLoopKind, RateLimitError},
      34                 :         timeline::EvictionError,
      35                 :         LogicalSizeCalculationCause, Tenant,
      36                 :     },
      37                 : };
      38                 : 
      39                 : use utils::completion;
      40                 : 
      41                 : use super::Timeline;
      42                 : 
      43 CBC        1302 : #[derive(Default)]
      44                 : pub struct EvictionTaskTimelineState {
      45                 :     last_layer_access_imitation: Option<tokio::time::Instant>,
      46                 : }
      47                 : 
      48             748 : #[derive(Default)]
      49                 : pub struct EvictionTaskTenantState {
      50                 :     last_layer_access_imitation: Option<Instant>,
      51                 : }
      52                 : 
      53                 : impl Timeline {
      54            1103 :     pub(super) fn launch_eviction_task(
      55            1103 :         self: &Arc<Self>,
      56            1103 :         background_tasks_can_start: Option<&completion::Barrier>,
      57            1103 :     ) {
      58            1103 :         let self_clone = Arc::clone(self);
      59            1103 :         let background_tasks_can_start = background_tasks_can_start.cloned();
      60            1103 :         task_mgr::spawn(
      61            1103 :             BACKGROUND_RUNTIME.handle(),
      62            1103 :             TaskKind::Eviction,
      63            1103 :             Some(self.tenant_id),
      64            1103 :             Some(self.timeline_id),
      65            1103 :             &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
      66            1103 :             false,
      67            1103 :             async move {
      68            1103 :                 let cancel = task_mgr::shutdown_token();
      69            1274 :                 tokio::select! {
      70            1274 :                     _ = cancel.cancelled() => { return Ok(()); }
      71            1274 :                     _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
      72            1274 :                 };
      73                 : 
      74            4194 :                 self_clone.eviction_task(cancel).await;
      75             405 :                 Ok(())
      76            1103 :             },
      77            1103 :         );
      78            1103 :     }
      79                 : 
      80            3081 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
      81                 :     async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
      82                 :         use crate::tenant::tasks::random_init_delay;
      83                 :         {
      84                 :             let policy = self.get_eviction_policy();
      85                 :             let period = match policy {
      86                 :                 EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
      87                 :                 EvictionPolicy::NoEviction => Duration::from_secs(10),
      88                 :             };
      89                 :             if random_init_delay(period, &cancel).await.is_err() {
      90                 :                 return;
      91                 :             }
      92                 :         }
      93                 : 
      94                 :         let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
      95                 :         loop {
      96                 :             let policy = self.get_eviction_policy();
      97                 :             let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
      98                 : 
      99                 :             match cf {
     100                 :                 ControlFlow::Break(()) => break,
     101                 :                 ControlFlow::Continue(sleep_until) => {
     102                 :                     if tokio::time::timeout_at(sleep_until, cancel.cancelled())
     103                 :                         .await
     104                 :                         .is_ok()
     105                 :                     {
     106                 :                         break;
     107                 :                     }
     108                 :                 }
     109                 :             }
     110                 :         }
     111                 :     }
     112                 : 
     113            1862 :     #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
     114                 :     async fn eviction_iteration(
     115                 :         self: &Arc<Self>,
     116                 :         policy: &EvictionPolicy,
     117                 :         cancel: &CancellationToken,
     118                 :         ctx: &RequestContext,
     119                 :     ) -> ControlFlow<(), Instant> {
     120 UBC           0 :         debug!("eviction iteration: {policy:?}");
     121                 :         match policy {
     122                 :             EvictionPolicy::NoEviction => {
     123                 :                 // check again in 10 seconds; XXX config watch mechanism
     124                 :                 ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
     125                 :             }
     126                 :             EvictionPolicy::LayerAccessThreshold(p) => {
     127                 :                 let start = Instant::now();
     128                 :                 match self.eviction_iteration_threshold(p, cancel, ctx).await {
     129                 :                     ControlFlow::Break(()) => return ControlFlow::Break(()),
     130                 :                     ControlFlow::Continue(()) => (),
     131                 :                 }
     132                 :                 let elapsed = start.elapsed();
     133                 :                 crate::tenant::tasks::warn_when_period_overrun(
     134                 :                     elapsed,
     135                 :                     p.period,
     136                 :                     BackgroundLoopKind::Eviction,
     137                 :                 );
     138                 :                 crate::metrics::EVICTION_ITERATION_DURATION
     139                 :                     .get_metric_with_label_values(&[
     140                 :                         &format!("{}", p.period.as_secs()),
     141                 :                         &format!("{}", p.threshold.as_secs()),
     142                 :                     ])
     143                 :                     .unwrap()
     144                 :                     .observe(elapsed.as_secs_f64());
     145                 :                 ControlFlow::Continue(start + p.period)
     146                 :             }
     147                 :         }
     148                 :     }
     149                 : 
     150 CBC          26 :     async fn eviction_iteration_threshold(
     151              26 :         self: &Arc<Self>,
     152              26 :         p: &EvictionPolicyLayerAccessThreshold,
     153              26 :         cancel: &CancellationToken,
     154              26 :         ctx: &RequestContext,
     155              26 :     ) -> ControlFlow<()> {
     156              26 :         let now = SystemTime::now();
     157                 : 
     158              26 :         let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(
     159              26 :             BackgroundLoopKind::Eviction,
     160              26 :             ctx,
     161              26 :             cancel,
     162              26 :         )
     163 UBC           0 :         .await
     164                 :         {
     165 CBC          26 :             Ok(permit) => permit,
     166 UBC           0 :             Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
     167                 :         };
     168                 : 
     169                 :         // If we evict layers but keep cached values derived from those layers, then
     170                 :         // we face a storm of on-demand downloads after pageserver restart.
     171                 :         // The reason is that the restart empties the caches, and so, the values
     172                 :         // need to be re-computed by accessing layers, which we evicted while the
     173                 :         // caches were filled.
     174                 :         //
     175                 :         // Solutions here would be one of the following:
     176                 :         // 1. Have a persistent cache.
     177                 :         // 2. Count every access to a cached value to the access stats of all layers
     178                 :         //    that were accessed to compute the value in the first place.
     179                 :         // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
     180                 :         //    get re-computed from layers, thereby counting towards layer access stats.
     181                 :         // 4. Make the eviction task imitate the layer accesses that typically hit caches.
     182                 :         //
     183                 :         // We follow approach (4) here because in Neon prod deployment:
     184                 :         // - page cache is quite small => high churn => low hit rate
     185                 :         //   => eviction gets correct access stats
     186                 :         // - value-level caches such as logical size & repatition have a high hit rate,
     187                 :         //   especially for inactive tenants
     188                 :         //   => eviction sees zero accesses for these
     189                 :         //   => they cause the on-demand download storm on pageserver restart
     190                 :         //
     191                 :         // We should probably move to persistent caches in the future, or avoid
     192                 :         // having inactive tenants attached to pageserver in the first place.
     193 CBC        2855 :         match self.imitate_layer_accesses(p, cancel, ctx).await {
     194 UBC           0 :             ControlFlow::Break(()) => return ControlFlow::Break(()),
     195 CBC          26 :             ControlFlow::Continue(()) => (),
     196              26 :         }
     197              26 : 
     198              26 :         #[allow(dead_code)]
     199              26 :         #[derive(Debug, Default)]
     200              26 :         struct EvictionStats {
     201              26 :             candidates: usize,
     202              26 :             evicted: usize,
     203              26 :             errors: usize,
     204              26 :             not_evictable: usize,
     205              26 :             skipped_for_shutdown: usize,
     206              26 :         }
     207              26 : 
     208              26 :         let mut stats = EvictionStats::default();
     209                 :         // Gather layers for eviction.
     210                 :         // NB: all the checks can be invalidated as soon as we release the layer map lock.
     211                 :         // We don't want to hold the layer map lock during eviction.
     212                 :         // So, we just need to deal with this.
     213              26 :         let candidates: Vec<Arc<dyn PersistentLayer>> = {
     214              26 :             let guard = self.layers.read().await;
     215              26 :             let layers = guard.layer_map();
     216              26 :             let mut candidates = Vec::new();
     217            1565 :             for hist_layer in layers.iter_historic_layers() {
     218            1565 :                 let hist_layer = guard.get_from_desc(&hist_layer);
     219            1565 :                 if hist_layer.is_remote_layer() {
     220             570 :                     continue;
     221             995 :                 }
     222             995 : 
     223             995 :                 let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
     224 UBC           0 :                     // We only use this fallback if there's an implementation error.
     225               0 :                     // `latest_activity` already does rate-limited warn!() log.
     226               0 :                     debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
     227               0 :                     SystemTime::now()
     228 CBC         995 :                 });
     229                 : 
     230             995 :                 let no_activity_for = match now.duration_since(last_activity_ts) {
     231             813 :                     Ok(d) => d,
     232             182 :                     Err(_e) => {
     233             182 :                         // We reach here if `now` < `last_activity_ts`, which can legitimately
     234             182 :                         // happen if there is an access between us getting `now`, and us getting
     235             182 :                         // the access stats from the layer.
     236             182 :                         //
     237             182 :                         // The other reason why it can happen is system clock skew because
     238             182 :                         // SystemTime::now() is not monotonic, so, even if there is no access
     239             182 :                         // to the layer after we get `now` at the beginning of this function,
     240             182 :                         // it could be that `now`  < `last_activity_ts`.
     241             182 :                         //
     242             182 :                         // To distinguish the cases, we would need to record `Instant`s in the
     243             182 :                         // access stats (i.e., monotonic timestamps), but then, the timestamps
     244             182 :                         // values in the access stats would need to be `Instant`'s, and hence
     245             182 :                         // they would be meaningless outside of the pageserver process.
     246             182 :                         // At the time of writing, the trade-off is that access stats are more
     247             182 :                         // valuable than detecting clock skew.
     248             182 :                         continue;
     249                 :                     }
     250                 :                 };
     251             813 :                 if no_activity_for > p.threshold {
     252              38 :                     candidates.push(hist_layer)
     253             775 :                 }
     254                 :             }
     255              26 :             candidates
     256              26 :         };
     257              26 :         stats.candidates = candidates.len();
     258                 : 
     259              26 :         let remote_client = match self.remote_client.as_ref() {
     260                 :             None => {
     261 UBC           0 :                 error!(
     262               0 :                     num_candidates = candidates.len(),
     263               0 :                     "no remote storage configured, cannot evict layers"
     264               0 :                 );
     265               0 :                 return ControlFlow::Continue(());
     266                 :             }
     267 CBC          26 :             Some(c) => c,
     268                 :         };
     269                 : 
     270              26 :         let results = match self
     271              26 :             .evict_layer_batch(remote_client, &candidates[..], cancel.clone())
     272               5 :             .await
     273                 :         {
     274 UBC           0 :             Err(pre_err) => {
     275               0 :                 stats.errors += candidates.len();
     276               0 :                 error!("could not do any evictions: {pre_err:#}");
     277               0 :                 return ControlFlow::Continue(());
     278                 :             }
     279 CBC          26 :             Ok(results) => results,
     280              26 :         };
     281              26 :         assert_eq!(results.len(), candidates.len());
     282              38 :         for (l, result) in candidates.iter().zip(results) {
     283 UBC           0 :             match result {
     284               0 :                 None => {
     285               0 :                     stats.skipped_for_shutdown += 1;
     286               0 :                 }
     287 CBC          38 :                 Some(Ok(())) => {
     288              38 :                     stats.evicted += 1;
     289              38 :                 }
     290 UBC           0 :                 Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
     291               0 :                     stats.not_evictable += 1;
     292               0 :                 }
     293               0 :                 Some(Err(EvictionError::FileNotFound)) => {
     294               0 :                     // compaction/gc removed the file while we were waiting on layer_removal_cs
     295               0 :                     stats.not_evictable += 1;
     296               0 :                 }
     297                 :                 Some(Err(
     298               0 :                     e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
     299                 :                 )) => {
     300               0 :                     let e = utils::error::report_compact_sources(&e);
     301               0 :                     warn!(layer = %l, "failed to evict layer: {e}");
     302               0 :                     stats.not_evictable += 1;
     303                 :                 }
     304               0 :                 Some(Err(EvictionError::MetadataInconsistency(detail))) => {
     305               0 :                     warn!(layer = %l, "failed to evict layer: {detail}");
     306               0 :                     stats.not_evictable += 1;
     307                 :                 }
     308                 :             }
     309                 :         }
     310 CBC          26 :         if stats.candidates == stats.not_evictable {
     311 UBC           0 :             debug!(stats=?stats, "eviction iteration complete");
     312 CBC           1 :         } else if stats.errors > 0 || stats.not_evictable > 0 {
     313 UBC           0 :             warn!(stats=?stats, "eviction iteration complete");
     314                 :         } else {
     315 CBC           1 :             info!(stats=?stats, "eviction iteration complete");
     316                 :         }
     317              26 :         ControlFlow::Continue(())
     318              26 :     }
     319                 : 
     320             104 :     #[instrument(skip_all)]
     321                 :     async fn imitate_layer_accesses(
     322                 :         &self,
     323                 :         p: &EvictionPolicyLayerAccessThreshold,
     324                 :         cancel: &CancellationToken,
     325                 :         ctx: &RequestContext,
     326                 :     ) -> ControlFlow<()> {
     327                 :         let mut state = self.eviction_task_timeline_state.lock().await;
     328                 : 
     329                 :         // Only do the imitate_layer accesses approximately as often as the threshold.  A little
     330                 :         // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
     331                 :         let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
     332                 : 
     333                 :         match state.last_layer_access_imitation {
     334                 :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     335                 :             _ => {
     336                 :                 self.imitate_timeline_cached_layer_accesses(cancel, ctx)
     337                 :                     .await;
     338                 :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now())
     339                 :             }
     340                 :         }
     341                 :         drop(state);
     342                 : 
     343                 :         if cancel.is_cancelled() {
     344                 :             return ControlFlow::Break(());
     345                 :         }
     346                 : 
     347                 :         // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
     348                 :         // Make one of the tenant's timelines draw the short straw and run the calculation.
     349                 :         // The others wait until the calculation is done so that they take into account the
     350                 :         // imitated accesses that the winner made.
     351                 :         //
     352                 :         // It is critical we are responsive to cancellation here. Otherwise, we deadlock with
     353                 :         // tenant deletion (holds TENANTS in read mode) any other task that attempts to
     354                 :         // acquire TENANTS in write mode before we here call get_tenant.
     355                 :         // See https://github.com/neondatabase/neon/issues/5284.
     356              26 :         let res = tokio::select! {
     357                 :             _ = cancel.cancelled() => {
     358                 :                 return ControlFlow::Break(());
     359                 :             }
     360              26 :             res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => {
     361                 :                 res
     362                 :             }
     363                 :         };
     364                 :         let tenant = match res {
     365                 :             Ok(t) => t,
     366                 :             Err(_) => {
     367                 :                 return ControlFlow::Break(());
     368                 :             }
     369                 :         };
     370                 :         let mut state = tenant.eviction_task_tenant_state.lock().await;
     371                 :         match state.last_layer_access_imitation {
     372                 :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     373                 :             _ => {
     374                 :                 self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
     375                 :                     .await;
     376                 :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now());
     377                 :             }
     378                 :         }
     379                 :         drop(state);
     380                 : 
     381                 :         if cancel.is_cancelled() {
     382                 :             return ControlFlow::Break(());
     383                 :         }
     384                 : 
     385                 :         ControlFlow::Continue(())
     386                 :     }
     387                 : 
     388                 :     /// Recompute the values which would cause on-demand downloads during restart.
     389              18 :     #[instrument(skip_all)]
     390                 :     async fn imitate_timeline_cached_layer_accesses(
     391                 :         &self,
     392                 :         cancel: &CancellationToken,
     393                 :         ctx: &RequestContext,
     394                 :     ) {
     395                 :         let lsn = self.get_last_record_lsn();
     396                 : 
     397                 :         // imitiate on-restart initial logical size
     398                 :         let size = self
     399                 :             .calculate_logical_size(
     400                 :                 lsn,
     401                 :                 LogicalSizeCalculationCause::EvictionTaskImitation,
     402                 :                 cancel.clone(),
     403                 :                 ctx,
     404                 :             )
     405                 :             .instrument(info_span!("calculate_logical_size"))
     406                 :             .await;
     407                 : 
     408                 :         match &size {
     409                 :             Ok(_size) => {
     410                 :                 // good, don't log it to avoid confusion
     411                 :             }
     412                 :             Err(_) => {
     413                 :                 // we have known issues for which we already log this on consumption metrics,
     414                 :                 // gc, and compaction. leave logging out for now.
     415                 :                 //
     416                 :                 // https://github.com/neondatabase/neon/issues/2539
     417                 :             }
     418                 :         }
     419                 : 
     420                 :         // imitiate repartiting on first compactation
     421                 :         if let Err(e) = self
     422                 :             .collect_keyspace(lsn, ctx)
     423                 :             .instrument(info_span!("collect_keyspace"))
     424                 :             .await
     425                 :         {
     426                 :             // if this failed, we probably failed logical size because these use the same keys
     427                 :             if size.is_err() {
     428                 :                 // ignore, see above comment
     429                 :             } else {
     430 UBC           0 :                 warn!(
     431               0 :                     "failed to collect keyspace but succeeded in calculating logical size: {e:#}"
     432               0 :                 );
     433                 :             }
     434                 :         }
     435                 :     }
     436                 : 
     437                 :     // Imitate the synthetic size calculation done by the consumption_metrics module.
     438 CBC          24 :     #[instrument(skip_all)]
     439                 :     async fn imitate_synthetic_size_calculation_worker(
     440                 :         &self,
     441                 :         tenant: &Arc<Tenant>,
     442                 :         ctx: &RequestContext,
     443                 :         cancel: &CancellationToken,
     444                 :     ) {
     445                 :         if self.conf.metric_collection_endpoint.is_none() {
     446                 :             // We don't start the consumption metrics task if this is not set in the config.
     447                 :             // So, no need to imitate the accesses in that case.
     448                 :             return;
     449                 :         }
     450                 : 
     451                 :         // The consumption metrics are collected on a per-tenant basis, by a single
     452                 :         // global background loop.
     453                 :         // It limits the number of synthetic size calculations using the global
     454                 :         // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
     455                 :         // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
     456                 :         //
     457                 :         // If we used that same semaphore here, then we'd compete for the
     458                 :         // same permits, which may impact timeliness of consumption metrics.
     459                 :         // That is a no-go, as consumption metrics are much more important
     460                 :         // than what we do here.
     461                 :         //
     462                 :         // So, we have a separate semaphore, initialized to the same
     463                 :         // number of permits as the `concurrent_tenant_size_logical_size_queries`.
     464                 :         // In the worst, we would have twice the amount of concurrenct size calculations.
     465                 :         // But in practice, the `p.threshold` >> `consumption metric interval`, and
     466                 :         // we spread out the eviction task using `random_init_delay`.
     467                 :         // So, the chance of the worst case is quite low in practice.
     468                 :         // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
     469                 :         // So, we must coordinate with other with other eviction tasks of this tenant.
     470                 :         let limit = self
     471                 :             .conf
     472                 :             .eviction_task_immitated_concurrent_logical_size_queries
     473                 :             .inner();
     474                 : 
     475                 :         let mut throwaway_cache = HashMap::new();
     476                 :         let gather = crate::tenant::size::gather_inputs(
     477                 :             tenant,
     478                 :             limit,
     479                 :             None,
     480                 :             &mut throwaway_cache,
     481                 :             LogicalSizeCalculationCause::EvictionTaskImitation,
     482                 :             ctx,
     483                 :         )
     484                 :         .instrument(info_span!("gather_inputs"));
     485                 : 
     486              12 :         tokio::select! {
     487                 :             _ = cancel.cancelled() => {}
     488               6 :             gather_result = gather => {
     489                 :                 match gather_result {
     490                 :                     Ok(_) => {},
     491                 :                     Err(e) => {
     492                 :                         // We don't care about the result, but, if it failed, we should log it,
     493                 :                         // since consumption metric might be hitting the cached value and
     494                 :                         // thus not encountering this error.
     495 UBC           0 :                         warn!("failed to imitate synthetic size calculation accesses: {e:#}")
     496                 :                     }
     497                 :                 }
     498                 :            }
     499                 :         }
     500                 :     }
     501                 : }
        

Generated by: LCOV version 2.1-beta