LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - eviction_task.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 78.8 % 170 134
Test Date: 2023-09-06 10:18:01 Functions: 61.1 % 36 22

            Line data    Source code
       1              : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
       2              : //! than a given threshold.
       3              : //!
       4              : //! Data includes all kinds of caches, namely:
       5              : //! - (in-memory layers)
       6              : //! - on-demand downloaded layer files on disk
       7              : //! - (cached layer file pages)
       8              : //! - derived data from layer file contents, namely:
       9              : //!     - initial logical size
      10              : //!     - partitioning
      11              : //!     - (other currently missing unknowns)
      12              : //!
      13              : //! Items with parentheses are not (yet) touched by this task.
      14              : //!
      15              : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
      16              : use std::{
      17              :     collections::HashMap,
      18              :     ops::ControlFlow,
      19              :     sync::Arc,
      20              :     time::{Duration, SystemTime},
      21              : };
      22              : 
      23              : use tokio::time::Instant;
      24              : use tokio_util::sync::CancellationToken;
      25              : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
      26              : 
      27              : use crate::{
      28              :     context::{DownloadBehavior, RequestContext},
      29              :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      30              :     tenant::{
      31              :         config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
      32              :         storage_layer::PersistentLayer,
      33              :         timeline::EvictionError,
      34              :         LogicalSizeCalculationCause, Tenant,
      35              :     },
      36              : };
      37              : 
      38              : use utils::completion;
      39              : 
      40              : use super::Timeline;
      41              : 
      42         1394 : #[derive(Default)]
      43              : pub struct EvictionTaskTimelineState {
      44              :     last_layer_access_imitation: Option<tokio::time::Instant>,
      45              : }
      46              : 
      47          779 : #[derive(Default)]
      48              : pub struct EvictionTaskTenantState {
      49              :     last_layer_access_imitation: Option<Instant>,
      50              : }
      51              : 
      52              : impl Timeline {
      53         1190 :     pub(super) fn launch_eviction_task(
      54         1190 :         self: &Arc<Self>,
      55         1190 :         background_tasks_can_start: Option<&completion::Barrier>,
      56         1190 :     ) {
      57         1190 :         let self_clone = Arc::clone(self);
      58         1190 :         let background_tasks_can_start = background_tasks_can_start.cloned();
      59         1190 :         task_mgr::spawn(
      60         1190 :             BACKGROUND_RUNTIME.handle(),
      61         1190 :             TaskKind::Eviction,
      62         1190 :             Some(self.tenant_id),
      63         1190 :             Some(self.timeline_id),
      64         1190 :             &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
      65         1190 :             false,
      66         1190 :             async move {
      67         1190 :                 let cancel = task_mgr::shutdown_token();
      68         1356 :                 tokio::select! {
      69         1356 :                     _ = cancel.cancelled() => { return Ok(()); }
      70         1356 :                     _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
      71         1356 :                 };
      72              : 
      73         2279 :                 self_clone.eviction_task(cancel).await;
      74          478 :                 Ok(())
      75         1190 :             },
      76         1190 :         );
      77         1190 :     }
      78              : 
      79         3291 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
      80              :     async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
      81              :         use crate::tenant::tasks::random_init_delay;
      82              :         {
      83              :             let policy = self.get_eviction_policy();
      84              :             let period = match policy {
      85              :                 EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
      86              :                 EvictionPolicy::NoEviction => Duration::from_secs(10),
      87              :             };
      88              :             if random_init_delay(period, &cancel).await.is_err() {
      89              :                 return;
      90              :             }
      91              :         }
      92              : 
      93              :         let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
      94              :         loop {
      95              :             let policy = self.get_eviction_policy();
      96              :             let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
      97              : 
      98              :             match cf {
      99              :                 ControlFlow::Break(()) => break,
     100              :                 ControlFlow::Continue(sleep_until) => {
     101              :                     if tokio::time::timeout_at(sleep_until, cancel.cancelled())
     102              :                         .await
     103              :                         .is_ok()
     104              :                     {
     105              :                         break;
     106              :                     }
     107              :                 }
     108              :             }
     109              :         }
     110              :     }
     111              : 
     112         1930 :     #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
     113              :     async fn eviction_iteration(
     114              :         self: &Arc<Self>,
     115              :         policy: &EvictionPolicy,
     116              :         cancel: &CancellationToken,
     117              :         ctx: &RequestContext,
     118              :     ) -> ControlFlow<(), Instant> {
     119            0 :         debug!("eviction iteration: {policy:?}");
     120              :         match policy {
     121              :             EvictionPolicy::NoEviction => {
     122              :                 // check again in 10 seconds; XXX config watch mechanism
     123              :                 ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
     124              :             }
     125              :             EvictionPolicy::LayerAccessThreshold(p) => {
     126              :                 let start = Instant::now();
     127              :                 match self.eviction_iteration_threshold(p, cancel, ctx).await {
     128              :                     ControlFlow::Break(()) => return ControlFlow::Break(()),
     129              :                     ControlFlow::Continue(()) => (),
     130              :                 }
     131              :                 let elapsed = start.elapsed();
     132              :                 crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
     133              :                 crate::metrics::EVICTION_ITERATION_DURATION
     134              :                     .get_metric_with_label_values(&[
     135              :                         &format!("{}", p.period.as_secs()),
     136              :                         &format!("{}", p.threshold.as_secs()),
     137              :                     ])
     138              :                     .unwrap()
     139              :                     .observe(elapsed.as_secs_f64());
     140              :                 ControlFlow::Continue(start + p.period)
     141              :             }
     142              :         }
     143              :     }
     144              : 
     145           27 :     async fn eviction_iteration_threshold(
     146           27 :         self: &Arc<Self>,
     147           27 :         p: &EvictionPolicyLayerAccessThreshold,
     148           27 :         cancel: &CancellationToken,
     149           27 :         ctx: &RequestContext,
     150           27 :     ) -> ControlFlow<()> {
     151           27 :         let now = SystemTime::now();
     152           27 : 
     153           27 :         // If we evict layers but keep cached values derived from those layers, then
     154           27 :         // we face a storm of on-demand downloads after pageserver restart.
     155           27 :         // The reason is that the restart empties the caches, and so, the values
     156           27 :         // need to be re-computed by accessing layers, which we evicted while the
     157           27 :         // caches were filled.
     158           27 :         //
     159           27 :         // Solutions here would be one of the following:
     160           27 :         // 1. Have a persistent cache.
     161           27 :         // 2. Count every access to a cached value to the access stats of all layers
     162           27 :         //    that were accessed to compute the value in the first place.
     163           27 :         // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
     164           27 :         //    get re-computed from layers, thereby counting towards layer access stats.
     165           27 :         // 4. Make the eviction task imitate the layer accesses that typically hit caches.
     166           27 :         //
     167           27 :         // We follow approach (4) here because in Neon prod deployment:
     168           27 :         // - page cache is quite small => high churn => low hit rate
     169           27 :         //   => eviction gets correct access stats
     170           27 :         // - value-level caches such as logical size & repatition have a high hit rate,
     171           27 :         //   especially for inactive tenants
     172           27 :         //   => eviction sees zero accesses for these
     173           27 :         //   => they cause the on-demand download storm on pageserver restart
     174           27 :         //
     175           27 :         // We should probably move to persistent caches in the future, or avoid
     176           27 :         // having inactive tenants attached to pageserver in the first place.
     177          839 :         match self.imitate_layer_accesses(p, cancel, ctx).await {
     178            0 :             ControlFlow::Break(()) => return ControlFlow::Break(()),
     179           27 :             ControlFlow::Continue(()) => (),
     180           27 :         }
     181           27 : 
     182           27 :         #[allow(dead_code)]
     183           27 :         #[derive(Debug, Default)]
     184           27 :         struct EvictionStats {
     185           27 :             candidates: usize,
     186           27 :             evicted: usize,
     187           27 :             errors: usize,
     188           27 :             not_evictable: usize,
     189           27 :             skipped_for_shutdown: usize,
     190           27 :         }
     191           27 : 
     192           27 :         let mut stats = EvictionStats::default();
     193              :         // Gather layers for eviction.
     194              :         // NB: all the checks can be invalidated as soon as we release the layer map lock.
     195              :         // We don't want to hold the layer map lock during eviction.
     196              :         // So, we just need to deal with this.
     197           27 :         let candidates: Vec<Arc<dyn PersistentLayer>> = {
     198           27 :             let guard = self.layers.read().await;
     199           27 :             let layers = guard.layer_map();
     200           27 :             let mut candidates = Vec::new();
     201         1595 :             for hist_layer in layers.iter_historic_layers() {
     202         1595 :                 let hist_layer = guard.get_from_desc(&hist_layer);
     203         1595 :                 if hist_layer.is_remote_layer() {
     204          608 :                     continue;
     205          987 :                 }
     206          987 : 
     207          987 :                 let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
     208            0 :                     // We only use this fallback if there's an implementation error.
     209            0 :                     // `latest_activity` already does rate-limited warn!() log.
     210            0 :                     debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
     211            0 :                     SystemTime::now()
     212          987 :                 });
     213              : 
     214          987 :                 let no_activity_for = match now.duration_since(last_activity_ts) {
     215          828 :                     Ok(d) => d,
     216          159 :                     Err(_e) => {
     217          159 :                         // We reach here if `now` < `last_activity_ts`, which can legitimately
     218          159 :                         // happen if there is an access between us getting `now`, and us getting
     219          159 :                         // the access stats from the layer.
     220          159 :                         //
     221          159 :                         // The other reason why it can happen is system clock skew because
     222          159 :                         // SystemTime::now() is not monotonic, so, even if there is no access
     223          159 :                         // to the layer after we get `now` at the beginning of this function,
     224          159 :                         // it could be that `now`  < `last_activity_ts`.
     225          159 :                         //
     226          159 :                         // To distinguish the cases, we would need to record `Instant`s in the
     227          159 :                         // access stats (i.e., monotonic timestamps), but then, the timestamps
     228          159 :                         // values in the access stats would need to be `Instant`'s, and hence
     229          159 :                         // they would be meaningless outside of the pageserver process.
     230          159 :                         // At the time of writing, the trade-off is that access stats are more
     231          159 :                         // valuable than detecting clock skew.
     232          159 :                         continue;
     233              :                     }
     234              :                 };
     235          828 :                 if no_activity_for > p.threshold {
     236           38 :                     candidates.push(hist_layer)
     237          790 :                 }
     238              :             }
     239           27 :             candidates
     240           27 :         };
     241           27 :         stats.candidates = candidates.len();
     242              : 
     243           27 :         let remote_client = match self.remote_client.as_ref() {
     244              :             None => {
     245            0 :                 error!(
     246            0 :                     num_candidates = candidates.len(),
     247            0 :                     "no remote storage configured, cannot evict layers"
     248            0 :                 );
     249            0 :                 return ControlFlow::Continue(());
     250              :             }
     251           27 :             Some(c) => c,
     252              :         };
     253              : 
     254           27 :         let results = match self
     255           27 :             .evict_layer_batch(remote_client, &candidates[..], cancel.clone())
     256            2 :             .await
     257              :         {
     258            0 :             Err(pre_err) => {
     259            0 :                 stats.errors += candidates.len();
     260            0 :                 error!("could not do any evictions: {pre_err:#}");
     261            0 :                 return ControlFlow::Continue(());
     262              :             }
     263           27 :             Ok(results) => results,
     264           27 :         };
     265           27 :         assert_eq!(results.len(), candidates.len());
     266           38 :         for (l, result) in candidates.iter().zip(results) {
     267            0 :             match result {
     268            0 :                 None => {
     269            0 :                     stats.skipped_for_shutdown += 1;
     270            0 :                 }
     271           38 :                 Some(Ok(())) => {
     272           38 :                     stats.evicted += 1;
     273           38 :                 }
     274            0 :                 Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
     275            0 :                     stats.not_evictable += 1;
     276            0 :                 }
     277            0 :                 Some(Err(EvictionError::FileNotFound)) => {
     278            0 :                     // compaction/gc removed the file while we were waiting on layer_removal_cs
     279            0 :                     stats.not_evictable += 1;
     280            0 :                 }
     281              :                 Some(Err(
     282            0 :                     e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
     283              :                 )) => {
     284            0 :                     let e = utils::error::report_compact_sources(&e);
     285            0 :                     warn!(layer = %l, "failed to evict layer: {e}");
     286            0 :                     stats.not_evictable += 1;
     287              :                 }
     288              :             }
     289              :         }
     290           27 :         if stats.candidates == stats.not_evictable {
     291            0 :             debug!(stats=?stats, "eviction iteration complete");
     292            1 :         } else if stats.errors > 0 || stats.not_evictable > 0 {
     293            0 :             warn!(stats=?stats, "eviction iteration complete");
     294              :         } else {
     295            1 :             info!(stats=?stats, "eviction iteration complete");
     296              :         }
     297           27 :         ControlFlow::Continue(())
     298           27 :     }
     299              : 
     300          108 :     #[instrument(skip_all)]
     301              :     async fn imitate_layer_accesses(
     302              :         &self,
     303              :         p: &EvictionPolicyLayerAccessThreshold,
     304              :         cancel: &CancellationToken,
     305              :         ctx: &RequestContext,
     306              :     ) -> ControlFlow<()> {
     307              :         let mut state = self.eviction_task_timeline_state.lock().await;
     308              : 
     309              :         // Only do the imitate_layer accesses approximately as often as the threshold.  A little
     310              :         // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
     311              :         let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
     312              : 
     313              :         match state.last_layer_access_imitation {
     314              :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     315              :             _ => {
     316              :                 self.imitate_timeline_cached_layer_accesses(cancel, ctx)
     317              :                     .await;
     318              :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now())
     319              :             }
     320              :         }
     321              :         drop(state);
     322              : 
     323              :         if cancel.is_cancelled() {
     324              :             return ControlFlow::Break(());
     325              :         }
     326              : 
     327              :         // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
     328              :         // Make one of the tenant's timelines draw the short straw and run the calculation.
     329              :         // The others wait until the calculation is done so that they take into account the
     330              :         // imitated accesses that the winner made.
     331              :         let Ok(tenant) = crate::tenant::mgr::get_tenant(self.tenant_id, true).await else {
     332              :             // likely, we're shutting down
     333              :             return ControlFlow::Break(());
     334              :         };
     335              :         let mut state = tenant.eviction_task_tenant_state.lock().await;
     336              :         match state.last_layer_access_imitation {
     337              :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     338              :             _ => {
     339              :                 self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
     340              :                     .await;
     341              :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now());
     342              :             }
     343              :         }
     344              :         drop(state);
     345              : 
     346              :         if cancel.is_cancelled() {
     347              :             return ControlFlow::Break(());
     348              :         }
     349              : 
     350              :         ControlFlow::Continue(())
     351              :     }
     352              : 
     353              :     /// Recompute the values which would cause on-demand downloads during restart.
     354           18 :     #[instrument(skip_all)]
     355              :     async fn imitate_timeline_cached_layer_accesses(
     356              :         &self,
     357              :         cancel: &CancellationToken,
     358              :         ctx: &RequestContext,
     359              :     ) {
     360              :         let lsn = self.get_last_record_lsn();
     361              : 
     362              :         // imitiate on-restart initial logical size
     363              :         let size = self
     364              :             .calculate_logical_size(
     365              :                 lsn,
     366              :                 LogicalSizeCalculationCause::EvictionTaskImitation,
     367              :                 cancel.clone(),
     368              :                 ctx,
     369              :             )
     370              :             .instrument(info_span!("calculate_logical_size"))
     371              :             .await;
     372              : 
     373              :         match &size {
     374              :             Ok(_size) => {
     375              :                 // good, don't log it to avoid confusion
     376              :             }
     377              :             Err(_) => {
     378              :                 // we have known issues for which we already log this on consumption metrics,
     379              :                 // gc, and compaction. leave logging out for now.
     380              :                 //
     381              :                 // https://github.com/neondatabase/neon/issues/2539
     382              :             }
     383              :         }
     384              : 
     385              :         // imitiate repartiting on first compactation
     386              :         if let Err(e) = self
     387              :             .collect_keyspace(lsn, ctx)
     388              :             .instrument(info_span!("collect_keyspace"))
     389              :             .await
     390              :         {
     391              :             // if this failed, we probably failed logical size because these use the same keys
     392              :             if size.is_err() {
     393              :                 // ignore, see above comment
     394              :             } else {
     395            0 :                 warn!(
     396            0 :                     "failed to collect keyspace but succeeded in calculating logical size: {e:#}"
     397            0 :                 );
     398              :             }
     399              :         }
     400              :     }
     401              : 
     402              :     // Imitate the synthetic size calculation done by the consumption_metrics module.
     403           24 :     #[instrument(skip_all)]
     404              :     async fn imitate_synthetic_size_calculation_worker(
     405              :         &self,
     406              :         tenant: &Arc<Tenant>,
     407              :         ctx: &RequestContext,
     408              :         cancel: &CancellationToken,
     409              :     ) {
     410              :         if self.conf.metric_collection_endpoint.is_none() {
     411              :             // We don't start the consumption metrics task if this is not set in the config.
     412              :             // So, no need to imitate the accesses in that case.
     413              :             return;
     414              :         }
     415              : 
     416              :         // The consumption metrics are collected on a per-tenant basis, by a single
     417              :         // global background loop.
     418              :         // It limits the number of synthetic size calculations using the global
     419              :         // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
     420              :         // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
     421              :         //
     422              :         // If we used that same semaphore here, then we'd compete for the
     423              :         // same permits, which may impact timeliness of consumption metrics.
     424              :         // That is a no-go, as consumption metrics are much more important
     425              :         // than what we do here.
     426              :         //
     427              :         // So, we have a separate semaphore, initialized to the same
     428              :         // number of permits as the `concurrent_tenant_size_logical_size_queries`.
     429              :         // In the worst, we would have twice the amount of concurrenct size calculations.
     430              :         // But in practice, the `p.threshold` >> `consumption metric interval`, and
     431              :         // we spread out the eviction task using `random_init_delay`.
     432              :         // So, the chance of the worst case is quite low in practice.
     433              :         // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
     434              :         // So, we must coordinate with other with other eviction tasks of this tenant.
     435              :         let limit = self
     436              :             .conf
     437              :             .eviction_task_immitated_concurrent_logical_size_queries
     438              :             .inner();
     439              : 
     440              :         let mut throwaway_cache = HashMap::new();
     441              :         let gather = crate::tenant::size::gather_inputs(
     442              :             tenant,
     443              :             limit,
     444              :             None,
     445              :             &mut throwaway_cache,
     446              :             LogicalSizeCalculationCause::EvictionTaskImitation,
     447              :             ctx,
     448              :         )
     449              :         .instrument(info_span!("gather_inputs"));
     450              : 
     451           12 :         tokio::select! {
     452              :             _ = cancel.cancelled() => {}
     453            6 :             gather_result = gather => {
     454              :                 match gather_result {
     455              :                     Ok(_) => {},
     456              :                     Err(e) => {
     457              :                         // We don't care about the result, but, if it failed, we should log it,
     458              :                         // since consumption metric might be hitting the cached value and
     459              :                         // thus not encountering this error.
     460            0 :                         warn!("failed to imitate synthetic size calculation accesses: {e:#}")
     461              :                     }
     462              :                 }
     463              :            }
     464              :         }
     465              :     }
     466              : }
        

Generated by: LCOV version 2.1-beta