LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - eviction_task.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 80.8 % 146 118
Test Date: 2024-02-07 07:37:29 Functions: 51.4 % 35 18

            Line data    Source code
       1              : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
       2              : //! than a given threshold.
       3              : //!
       4              : //! Data includes all kinds of caches, namely:
       5              : //! - (in-memory layers)
       6              : //! - on-demand downloaded layer files on disk
       7              : //! - (cached layer file pages)
       8              : //! - derived data from layer file contents, namely:
       9              : //!     - initial logical size
      10              : //!     - partitioning
      11              : //!     - (other currently missing unknowns)
      12              : //!
      13              : //! Items with parentheses are not (yet) touched by this task.
      14              : //!
      15              : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
      16              : use std::{
      17              :     collections::HashMap,
      18              :     ops::ControlFlow,
      19              :     sync::Arc,
      20              :     time::{Duration, SystemTime},
      21              : };
      22              : 
      23              : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
      24              : use tokio::time::Instant;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
      27              : 
      28              : use crate::{
      29              :     context::{DownloadBehavior, RequestContext},
      30              :     pgdatadir_mapping::CollectKeySpaceError,
      31              :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      32              :     tenant::{
      33              :         tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
      34              :     },
      35              : };
      36              : 
      37              : use utils::completion;
      38              : 
      39              : use super::Timeline;
      40              : 
      41         1568 : #[derive(Default)]
      42              : pub struct EvictionTaskTimelineState {
      43              :     last_layer_access_imitation: Option<tokio::time::Instant>,
      44              : }
      45              : 
      46          942 : #[derive(Default)]
      47              : pub struct EvictionTaskTenantState {
      48              :     last_layer_access_imitation: Option<Instant>,
      49              : }
      50              : 
      51              : impl Timeline {
      52         1235 :     pub(super) fn launch_eviction_task(
      53         1235 :         self: &Arc<Self>,
      54         1235 :         background_tasks_can_start: Option<&completion::Barrier>,
      55         1235 :     ) {
      56         1235 :         let self_clone = Arc::clone(self);
      57         1235 :         let background_tasks_can_start = background_tasks_can_start.cloned();
      58         1235 :         task_mgr::spawn(
      59         1235 :             BACKGROUND_RUNTIME.handle(),
      60         1235 :             TaskKind::Eviction,
      61         1235 :             Some(self.tenant_shard_id),
      62         1235 :             Some(self.timeline_id),
      63         1235 :             &format!(
      64         1235 :                 "layer eviction for {}/{}",
      65         1235 :                 self.tenant_shard_id, self.timeline_id
      66         1235 :             ),
      67         1235 :             false,
      68         1235 :             async move {
      69         1235 :                 let cancel = task_mgr::shutdown_token();
      70         1235 :                 tokio::select! {
      71         1235 :                     _ = cancel.cancelled() => { return Ok(()); }
      72         1235 :                     _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
      73         1235 :                 };
      74              : 
      75         4157 :                 self_clone.eviction_task(cancel).await;
      76          545 :                 Ok(())
      77         1235 :             },
      78         1235 :         );
      79         1235 :     }
      80              : 
      81         1235 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
      82              :     async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
      83              :         use crate::tenant::tasks::random_init_delay;
      84              :         {
      85              :             let policy = self.get_eviction_policy();
      86              :             let period = match policy {
      87              :                 EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
      88              :                 EvictionPolicy::NoEviction => Duration::from_secs(10),
      89              :             };
      90              :             if random_init_delay(period, &cancel).await.is_err() {
      91              :                 return;
      92              :             }
      93              :         }
      94              : 
      95              :         let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
      96              :         loop {
      97              :             let policy = self.get_eviction_policy();
      98              :             let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
      99              : 
     100              :             match cf {
     101              :                 ControlFlow::Break(()) => break,
     102              :                 ControlFlow::Continue(sleep_until) => {
     103              :                     if tokio::time::timeout_at(sleep_until, cancel.cancelled())
     104              :                         .await
     105              :                         .is_ok()
     106              :                     {
     107              :                         break;
     108              :                     }
     109              :                 }
     110              :             }
     111              :         }
     112              :     }
     113              : 
     114         1353 :     #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
     115              :     async fn eviction_iteration(
     116              :         self: &Arc<Self>,
     117              :         policy: &EvictionPolicy,
     118              :         cancel: &CancellationToken,
     119              :         ctx: &RequestContext,
     120              :     ) -> ControlFlow<(), Instant> {
     121            0 :         debug!("eviction iteration: {policy:?}");
     122              :         match policy {
     123              :             EvictionPolicy::NoEviction => {
     124              :                 // check again in 10 seconds; XXX config watch mechanism
     125              :                 ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
     126              :             }
     127              :             EvictionPolicy::LayerAccessThreshold(p) => {
     128              :                 let start = Instant::now();
     129              :                 match self.eviction_iteration_threshold(p, cancel, ctx).await {
     130              :                     ControlFlow::Break(()) => return ControlFlow::Break(()),
     131              :                     ControlFlow::Continue(()) => (),
     132              :                 }
     133              :                 let elapsed = start.elapsed();
     134              :                 crate::tenant::tasks::warn_when_period_overrun(
     135              :                     elapsed,
     136              :                     p.period,
     137              :                     BackgroundLoopKind::Eviction,
     138              :                 );
     139              :                 crate::metrics::EVICTION_ITERATION_DURATION
     140              :                     .get_metric_with_label_values(&[
     141              :                         &format!("{}", p.period.as_secs()),
     142              :                         &format!("{}", p.threshold.as_secs()),
     143              :                     ])
     144              :                     .unwrap()
     145              :                     .observe(elapsed.as_secs_f64());
     146              :                 ControlFlow::Continue(start + p.period)
     147              :             }
     148              :         }
     149              :     }
     150              : 
     151           29 :     async fn eviction_iteration_threshold(
     152           29 :         self: &Arc<Self>,
     153           29 :         p: &EvictionPolicyLayerAccessThreshold,
     154           29 :         cancel: &CancellationToken,
     155           29 :         ctx: &RequestContext,
     156           29 :     ) -> ControlFlow<()> {
     157           29 :         let now = SystemTime::now();
     158           29 : 
     159           29 :         let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
     160           29 :             BackgroundLoopKind::Eviction,
     161           29 :             ctx,
     162           29 :         );
     163              : 
     164           29 :         let _permit = tokio::select! {
     165           29 :             permit = acquire_permit => permit,
     166              :             _ = cancel.cancelled() => return ControlFlow::Break(()),
     167              :             _ = self.cancel.cancelled() => return ControlFlow::Break(()),
     168              :         };
     169              : 
     170              :         // If we evict layers but keep cached values derived from those layers, then
     171              :         // we face a storm of on-demand downloads after pageserver restart.
     172              :         // The reason is that the restart empties the caches, and so, the values
     173              :         // need to be re-computed by accessing layers, which we evicted while the
     174              :         // caches were filled.
     175              :         //
     176              :         // Solutions here would be one of the following:
     177              :         // 1. Have a persistent cache.
     178              :         // 2. Count every access to a cached value to the access stats of all layers
     179              :         //    that were accessed to compute the value in the first place.
     180              :         // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
     181              :         //    get re-computed from layers, thereby counting towards layer access stats.
     182              :         // 4. Make the eviction task imitate the layer accesses that typically hit caches.
     183              :         //
     184              :         // We follow approach (4) here because in Neon prod deployment:
     185              :         // - page cache is quite small => high churn => low hit rate
     186              :         //   => eviction gets correct access stats
     187              :         // - value-level caches such as logical size & repatition have a high hit rate,
     188              :         //   especially for inactive tenants
     189              :         //   => eviction sees zero accesses for these
     190              :         //   => they cause the on-demand download storm on pageserver restart
     191              :         //
     192              :         // We should probably move to persistent caches in the future, or avoid
     193              :         // having inactive tenants attached to pageserver in the first place.
     194         1658 :         match self.imitate_layer_accesses(p, cancel, ctx).await {
     195            0 :             ControlFlow::Break(()) => return ControlFlow::Break(()),
     196           29 :             ControlFlow::Continue(()) => (),
     197           29 :         }
     198           29 : 
     199           29 :         #[allow(dead_code)]
     200           29 :         #[derive(Debug, Default)]
     201           29 :         struct EvictionStats {
     202           29 :             candidates: usize,
     203           29 :             evicted: usize,
     204           29 :             errors: usize,
     205           29 :             not_evictable: usize,
     206           29 :             skipped_for_shutdown: usize,
     207           29 :         }
     208           29 : 
     209           29 :         let mut stats = EvictionStats::default();
     210           29 :         // Gather layers for eviction.
     211           29 :         // NB: all the checks can be invalidated as soon as we release the layer map lock.
     212           29 :         // We don't want to hold the layer map lock during eviction.
     213           29 : 
     214           29 :         // So, we just need to deal with this.
     215           29 : 
     216           29 :         if self.remote_client.is_none() {
     217            0 :             error!("no remote storage configured, cannot evict layers");
     218            0 :             return ControlFlow::Continue(());
     219           29 :         }
     220           29 : 
     221           29 :         let mut js = tokio::task::JoinSet::new();
     222              :         {
     223           29 :             let guard = self.layers.read().await;
     224           29 :             let layers = guard.layer_map();
     225         1716 :             for hist_layer in layers.iter_historic_layers() {
     226         1716 :                 let hist_layer = guard.get_from_desc(&hist_layer);
     227              : 
     228              :                 // guard against eviction while we inspect it; it might be that eviction_task and
     229              :                 // disk_usage_eviction_task both select the same layers to be evicted, and
     230              :                 // seemingly free up double the space. both succeeding is of no consequence.
     231         1716 :                 let guard = match hist_layer.keep_resident().await {
     232         1131 :                     Ok(Some(l)) => l,
     233          585 :                     Ok(None) => continue,
     234            0 :                     Err(e) => {
     235              :                         // these should not happen, but we cannot make them statically impossible right
     236              :                         // now.
     237            0 :                         tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
     238            0 :                         continue;
     239              :                     }
     240              :                 };
     241              : 
     242         1131 :                 let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
     243            0 :                     // We only use this fallback if there's an implementation error.
     244            0 :                     // `latest_activity` already does rate-limited warn!() log.
     245            0 :                     debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
     246            0 :                     SystemTime::now()
     247         1131 :                 });
     248              : 
     249         1131 :                 let no_activity_for = match now.duration_since(last_activity_ts) {
     250          950 :                     Ok(d) => d,
     251          181 :                     Err(_e) => {
     252          181 :                         // We reach here if `now` < `last_activity_ts`, which can legitimately
     253          181 :                         // happen if there is an access between us getting `now`, and us getting
     254          181 :                         // the access stats from the layer.
     255          181 :                         //
     256          181 :                         // The other reason why it can happen is system clock skew because
     257          181 :                         // SystemTime::now() is not monotonic, so, even if there is no access
     258          181 :                         // to the layer after we get `now` at the beginning of this function,
     259          181 :                         // it could be that `now`  < `last_activity_ts`.
     260          181 :                         //
     261          181 :                         // To distinguish the cases, we would need to record `Instant`s in the
     262          181 :                         // access stats (i.e., monotonic timestamps), but then, the timestamps
     263          181 :                         // values in the access stats would need to be `Instant`'s, and hence
     264          181 :                         // they would be meaningless outside of the pageserver process.
     265          181 :                         // At the time of writing, the trade-off is that access stats are more
     266          181 :                         // valuable than detecting clock skew.
     267          181 :                         continue;
     268              :                     }
     269              :                 };
     270          950 :                 let layer = guard.drop_eviction_guard();
     271          950 :                 if no_activity_for > p.threshold {
     272           38 :                     // this could cause a lot of allocations in some cases
     273           38 :                     js.spawn(async move { layer.evict_and_wait().await });
     274           38 :                     stats.candidates += 1;
     275          912 :                 }
     276              :             }
     277              :         };
     278              : 
     279           29 :         let join_all = async move {
     280           67 :             while let Some(next) = js.join_next().await {
     281            0 :                 match next {
     282           38 :                     Ok(Ok(())) => stats.evicted += 1,
     283            0 :                     Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
     284            0 :                         stats.not_evictable += 1;
     285            0 :                     }
     286            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used"),
     287            0 :                     Err(je) if je.is_panic() => {
     288            0 :                         /* already logged */
     289            0 :                         stats.errors += 1;
     290            0 :                     }
     291            0 :                     Err(je) => tracing::error!("unknown JoinError: {je:?}"),
     292              :                 }
     293              :             }
     294           29 :             stats
     295           29 :         };
     296              : 
     297           29 :         tokio::select! {
     298           29 :             stats = join_all => {
     299              :                 if stats.candidates == stats.not_evictable {
     300            0 :                     debug!(stats=?stats, "eviction iteration complete");
     301              :                 } else if stats.errors > 0 || stats.not_evictable > 0 {
     302            0 :                     warn!(stats=?stats, "eviction iteration complete");
     303              :                 } else {
     304            2 :                     info!(stats=?stats, "eviction iteration complete");
     305              :                 }
     306              :             }
     307              :             _ = cancel.cancelled() => {
     308              :                 // just drop the joinset to "abort"
     309              :             }
     310              :         }
     311              : 
     312           29 :         ControlFlow::Continue(())
     313           29 :     }
     314              : 
     315           29 :     #[instrument(skip_all)]
     316              :     async fn imitate_layer_accesses(
     317              :         &self,
     318              :         p: &EvictionPolicyLayerAccessThreshold,
     319              :         cancel: &CancellationToken,
     320              :         ctx: &RequestContext,
     321              :     ) -> ControlFlow<()> {
     322              :         if !self.tenant_shard_id.is_zero() {
     323              :             // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
     324              :             // for consumption metrics (consumption metrics are only sent from shard 0).  We may therefore
     325              :             // skip imitating logical size accesses for eviction purposes.
     326              :             return ControlFlow::Continue(());
     327              :         }
     328              : 
     329              :         let mut state = self.eviction_task_timeline_state.lock().await;
     330              : 
     331              :         // Only do the imitate_layer accesses approximately as often as the threshold.  A little
     332              :         // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
     333              :         let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
     334              : 
     335              :         match state.last_layer_access_imitation {
     336              :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     337              :             _ => {
     338              :                 self.imitate_timeline_cached_layer_accesses(ctx).await;
     339              :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now())
     340              :             }
     341              :         }
     342              :         drop(state);
     343              : 
     344              :         if cancel.is_cancelled() {
     345              :             return ControlFlow::Break(());
     346              :         }
     347              : 
     348              :         // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
     349              :         // Make one of the tenant's timelines draw the short straw and run the calculation.
     350              :         // The others wait until the calculation is done so that they take into account the
     351              :         // imitated accesses that the winner made.
     352              :         let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
     353              :             Ok(t) => t,
     354              :             Err(_) => {
     355              :                 return ControlFlow::Break(());
     356              :             }
     357              :         };
     358              :         let mut state = tenant.eviction_task_tenant_state.lock().await;
     359              :         match state.last_layer_access_imitation {
     360              :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     361              :             _ => {
     362              :                 self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
     363              :                     .await;
     364              :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now());
     365              :             }
     366              :         }
     367              :         drop(state);
     368              : 
     369              :         if cancel.is_cancelled() {
     370              :             return ControlFlow::Break(());
     371              :         }
     372              : 
     373              :         ControlFlow::Continue(())
     374              :     }
     375              : 
     376              :     /// Recompute the values which would cause on-demand downloads during restart.
     377            0 :     #[instrument(skip_all)]
     378              :     async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
     379              :         let lsn = self.get_last_record_lsn();
     380              : 
     381              :         // imitiate on-restart initial logical size
     382              :         let size = self
     383              :             .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
     384              :             .instrument(info_span!("calculate_logical_size"))
     385              :             .await;
     386              : 
     387              :         match &size {
     388              :             Ok(_size) => {
     389              :                 // good, don't log it to avoid confusion
     390              :             }
     391              :             Err(_) => {
     392              :                 // we have known issues for which we already log this on consumption metrics,
     393              :                 // gc, and compaction. leave logging out for now.
     394              :                 //
     395              :                 // https://github.com/neondatabase/neon/issues/2539
     396              :             }
     397              :         }
     398              : 
     399              :         // imitiate repartiting on first compactation
     400              :         if let Err(e) = self
     401              :             .collect_keyspace(lsn, ctx)
     402              :             .instrument(info_span!("collect_keyspace"))
     403              :             .await
     404              :         {
     405              :             // if this failed, we probably failed logical size because these use the same keys
     406              :             if size.is_err() {
     407              :                 // ignore, see above comment
     408              :             } else {
     409              :                 match e {
     410              :                     CollectKeySpaceError::Cancelled => {
     411              :                         // Shutting down, ignore
     412              :                     }
     413              :                     err => {
     414            0 :                         warn!(
     415            0 :                             "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
     416            0 :                         );
     417              :                     }
     418              :                 }
     419              :             }
     420              :         }
     421              :     }
     422              : 
     423              :     // Imitate the synthetic size calculation done by the consumption_metrics module.
     424            6 :     #[instrument(skip_all)]
     425              :     async fn imitate_synthetic_size_calculation_worker(
     426              :         &self,
     427              :         tenant: &Arc<Tenant>,
     428              :         cancel: &CancellationToken,
     429              :         ctx: &RequestContext,
     430              :     ) {
     431              :         if self.conf.metric_collection_endpoint.is_none() {
     432              :             // We don't start the consumption metrics task if this is not set in the config.
     433              :             // So, no need to imitate the accesses in that case.
     434              :             return;
     435              :         }
     436              : 
     437              :         // The consumption metrics are collected on a per-tenant basis, by a single
     438              :         // global background loop.
     439              :         // It limits the number of synthetic size calculations using the global
     440              :         // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
     441              :         // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
     442              :         //
     443              :         // If we used that same semaphore here, then we'd compete for the
     444              :         // same permits, which may impact timeliness of consumption metrics.
     445              :         // That is a no-go, as consumption metrics are much more important
     446              :         // than what we do here.
     447              :         //
     448              :         // So, we have a separate semaphore, initialized to the same
     449              :         // number of permits as the `concurrent_tenant_size_logical_size_queries`.
     450              :         // In the worst, we would have twice the amount of concurrenct size calculations.
     451              :         // But in practice, the `p.threshold` >> `consumption metric interval`, and
     452              :         // we spread out the eviction task using `random_init_delay`.
     453              :         // So, the chance of the worst case is quite low in practice.
     454              :         // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
     455              :         // So, we must coordinate with other with other eviction tasks of this tenant.
     456              :         let limit = self
     457              :             .conf
     458              :             .eviction_task_immitated_concurrent_logical_size_queries
     459              :             .inner();
     460              : 
     461              :         let mut throwaway_cache = HashMap::new();
     462              :         let gather = crate::tenant::size::gather_inputs(
     463              :             tenant,
     464              :             limit,
     465              :             None,
     466              :             &mut throwaway_cache,
     467              :             LogicalSizeCalculationCause::EvictionTaskImitation,
     468              :             cancel,
     469              :             ctx,
     470              :         )
     471              :         .instrument(info_span!("gather_inputs"));
     472              : 
     473           12 :         tokio::select! {
     474              :             _ = cancel.cancelled() => {}
     475            6 :             gather_result = gather => {
     476              :                 match gather_result {
     477              :                     Ok(_) => {},
     478              :                     Err(e) => {
     479              :                         // We don't care about the result, but, if it failed, we should log it,
     480              :                         // since consumption metric might be hitting the cached value and
     481              :                         // thus not encountering this error.
     482            0 :                         warn!("failed to imitate synthetic size calculation accesses: {e:#}")
     483              :                     }
     484              :                 }
     485              :            }
     486              :         }
     487              :     }
     488              : }
        

Generated by: LCOV version 2.1-beta