LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - eviction_task.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 80.0 % 140 112 28 112
Current Date: 2024-01-09 02:06:09 Functions: 51.4 % 35 18 17 18
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
       2                 : //! than a given threshold.
       3                 : //!
       4                 : //! Data includes all kinds of caches, namely:
       5                 : //! - (in-memory layers)
       6                 : //! - on-demand downloaded layer files on disk
       7                 : //! - (cached layer file pages)
       8                 : //! - derived data from layer file contents, namely:
       9                 : //!     - initial logical size
      10                 : //!     - partitioning
      11                 : //!     - (other currently missing unknowns)
      12                 : //!
      13                 : //! Items with parentheses are not (yet) touched by this task.
      14                 : //!
      15                 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
      16                 : use std::{
      17                 :     collections::HashMap,
      18                 :     ops::ControlFlow,
      19                 :     sync::Arc,
      20                 :     time::{Duration, SystemTime},
      21                 : };
      22                 : 
      23                 : use tokio::time::Instant;
      24                 : use tokio_util::sync::CancellationToken;
      25                 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
      26                 : 
      27                 : use crate::{
      28                 :     context::{DownloadBehavior, RequestContext},
      29                 :     pgdatadir_mapping::CollectKeySpaceError,
      30                 :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      31                 :     tenant::{
      32                 :         config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
      33                 :         tasks::BackgroundLoopKind,
      34                 :         timeline::EvictionError,
      35                 :         LogicalSizeCalculationCause, Tenant,
      36                 :     },
      37                 : };
      38                 : 
      39                 : use utils::completion;
      40                 : 
      41                 : use super::Timeline;
      42                 : 
      43 CBC        1290 : #[derive(Default)]
      44                 : pub struct EvictionTaskTimelineState {
      45                 :     last_layer_access_imitation: Option<tokio::time::Instant>,
      46                 : }
      47                 : 
      48             778 : #[derive(Default)]
      49                 : pub struct EvictionTaskTenantState {
      50                 :     last_layer_access_imitation: Option<Instant>,
      51                 : }
      52                 : 
      53                 : impl Timeline {
      54            1102 :     pub(super) fn launch_eviction_task(
      55            1102 :         self: &Arc<Self>,
      56            1102 :         background_tasks_can_start: Option<&completion::Barrier>,
      57            1102 :     ) {
      58            1102 :         let self_clone = Arc::clone(self);
      59            1102 :         let background_tasks_can_start = background_tasks_can_start.cloned();
      60            1102 :         task_mgr::spawn(
      61            1102 :             BACKGROUND_RUNTIME.handle(),
      62            1102 :             TaskKind::Eviction,
      63            1102 :             Some(self.tenant_shard_id),
      64            1102 :             Some(self.timeline_id),
      65            1102 :             &format!(
      66            1102 :                 "layer eviction for {}/{}",
      67            1102 :                 self.tenant_shard_id, self.timeline_id
      68            1102 :             ),
      69            1102 :             false,
      70            1102 :             async move {
      71            1102 :                 let cancel = task_mgr::shutdown_token();
      72            1102 :                 tokio::select! {
      73            1102 :                     _ = cancel.cancelled() => { return Ok(()); }
      74            1102 :                     _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
      75            1102 :                 };
      76                 : 
      77            6389 :                 self_clone.eviction_task(cancel).await;
      78             471 :                 Ok(())
      79            1102 :             },
      80            1102 :         );
      81            1102 :     }
      82                 : 
      83            1102 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
      84                 :     async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
      85                 :         use crate::tenant::tasks::random_init_delay;
      86                 :         {
      87                 :             let policy = self.get_eviction_policy();
      88                 :             let period = match policy {
      89                 :                 EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
      90                 :                 EvictionPolicy::NoEviction => Duration::from_secs(10),
      91                 :             };
      92                 :             if random_init_delay(period, &cancel).await.is_err() {
      93                 :                 return;
      94                 :             }
      95                 :         }
      96                 : 
      97                 :         let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
      98                 :         loop {
      99                 :             let policy = self.get_eviction_policy();
     100                 :             let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
     101                 : 
     102                 :             match cf {
     103                 :                 ControlFlow::Break(()) => break,
     104                 :                 ControlFlow::Continue(sleep_until) => {
     105                 :                     if tokio::time::timeout_at(sleep_until, cancel.cancelled())
     106                 :                         .await
     107                 :                         .is_ok()
     108                 :                     {
     109                 :                         break;
     110                 :                     }
     111                 :                 }
     112                 :             }
     113                 :         }
     114                 :     }
     115                 : 
     116            1382 :     #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
     117                 :     async fn eviction_iteration(
     118                 :         self: &Arc<Self>,
     119                 :         policy: &EvictionPolicy,
     120                 :         cancel: &CancellationToken,
     121                 :         ctx: &RequestContext,
     122                 :     ) -> ControlFlow<(), Instant> {
     123 UBC           0 :         debug!("eviction iteration: {policy:?}");
     124                 :         match policy {
     125                 :             EvictionPolicy::NoEviction => {
     126                 :                 // check again in 10 seconds; XXX config watch mechanism
     127                 :                 ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
     128                 :             }
     129                 :             EvictionPolicy::LayerAccessThreshold(p) => {
     130                 :                 let start = Instant::now();
     131                 :                 match self.eviction_iteration_threshold(p, cancel, ctx).await {
     132                 :                     ControlFlow::Break(()) => return ControlFlow::Break(()),
     133                 :                     ControlFlow::Continue(()) => (),
     134                 :                 }
     135                 :                 let elapsed = start.elapsed();
     136                 :                 crate::tenant::tasks::warn_when_period_overrun(
     137                 :                     elapsed,
     138                 :                     p.period,
     139                 :                     BackgroundLoopKind::Eviction,
     140                 :                 );
     141                 :                 crate::metrics::EVICTION_ITERATION_DURATION
     142                 :                     .get_metric_with_label_values(&[
     143                 :                         &format!("{}", p.period.as_secs()),
     144                 :                         &format!("{}", p.threshold.as_secs()),
     145                 :                     ])
     146                 :                     .unwrap()
     147                 :                     .observe(elapsed.as_secs_f64());
     148                 :                 ControlFlow::Continue(start + p.period)
     149                 :             }
     150                 :         }
     151                 :     }
     152                 : 
     153 CBC          36 :     async fn eviction_iteration_threshold(
     154              36 :         self: &Arc<Self>,
     155              36 :         p: &EvictionPolicyLayerAccessThreshold,
     156              36 :         cancel: &CancellationToken,
     157              36 :         ctx: &RequestContext,
     158              36 :     ) -> ControlFlow<()> {
     159              36 :         let now = SystemTime::now();
     160              36 : 
     161              36 :         let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
     162              36 :             BackgroundLoopKind::Eviction,
     163              36 :             ctx,
     164              36 :         );
     165                 : 
     166              36 :         let _permit = tokio::select! {
     167              36 :             permit = acquire_permit => permit,
     168                 :             _ = cancel.cancelled() => return ControlFlow::Break(()),
     169                 :             _ = self.cancel.cancelled() => return ControlFlow::Break(()),
     170                 :         };
     171                 : 
     172                 :         // If we evict layers but keep cached values derived from those layers, then
     173                 :         // we face a storm of on-demand downloads after pageserver restart.
     174                 :         // The reason is that the restart empties the caches, and so, the values
     175                 :         // need to be re-computed by accessing layers, which we evicted while the
     176                 :         // caches were filled.
     177                 :         //
     178                 :         // Solutions here would be one of the following:
     179                 :         // 1. Have a persistent cache.
     180                 :         // 2. Count every access to a cached value to the access stats of all layers
     181                 :         //    that were accessed to compute the value in the first place.
     182                 :         // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
     183                 :         //    get re-computed from layers, thereby counting towards layer access stats.
     184                 :         // 4. Make the eviction task imitate the layer accesses that typically hit caches.
     185                 :         //
     186                 :         // We follow approach (4) here because in Neon prod deployment:
     187                 :         // - page cache is quite small => high churn => low hit rate
     188                 :         //   => eviction gets correct access stats
     189                 :         // - value-level caches such as logical size & repatition have a high hit rate,
     190                 :         //   especially for inactive tenants
     191                 :         //   => eviction sees zero accesses for these
     192                 :         //   => they cause the on-demand download storm on pageserver restart
     193                 :         //
     194                 :         // We should probably move to persistent caches in the future, or avoid
     195                 :         // having inactive tenants attached to pageserver in the first place.
     196            3902 :         match self.imitate_layer_accesses(p, cancel, ctx).await {
     197 UBC           0 :             ControlFlow::Break(()) => return ControlFlow::Break(()),
     198 CBC          36 :             ControlFlow::Continue(()) => (),
     199              36 :         }
     200              36 : 
     201              36 :         #[allow(dead_code)]
     202              36 :         #[derive(Debug, Default)]
     203              36 :         struct EvictionStats {
     204              36 :             candidates: usize,
     205              36 :             evicted: usize,
     206              36 :             errors: usize,
     207              36 :             not_evictable: usize,
     208              36 :             skipped_for_shutdown: usize,
     209              36 :         }
     210              36 : 
     211              36 :         let mut stats = EvictionStats::default();
     212                 :         // Gather layers for eviction.
     213                 :         // NB: all the checks can be invalidated as soon as we release the layer map lock.
     214                 :         // We don't want to hold the layer map lock during eviction.
     215                 : 
     216                 :         // So, we just need to deal with this.
     217                 : 
     218              36 :         let remote_client = match self.remote_client.as_ref() {
     219              36 :             Some(c) => c,
     220                 :             None => {
     221 UBC           0 :                 error!("no remote storage configured, cannot evict layers");
     222               0 :                 return ControlFlow::Continue(());
     223                 :             }
     224                 :         };
     225                 : 
     226 CBC          36 :         let mut js = tokio::task::JoinSet::new();
     227                 :         {
     228              36 :             let guard = self.layers.read().await;
     229              36 :             let layers = guard.layer_map();
     230            1899 :             for hist_layer in layers.iter_historic_layers() {
     231            1899 :                 let hist_layer = guard.get_from_desc(&hist_layer);
     232                 : 
     233                 :                 // guard against eviction while we inspect it; it might be that eviction_task and
     234                 :                 // disk_usage_eviction_task both select the same layers to be evicted, and
     235                 :                 // seemingly free up double the space. both succeeding is of no consequence.
     236            1899 :                 let guard = match hist_layer.keep_resident().await {
     237            1277 :                     Ok(Some(l)) => l,
     238             622 :                     Ok(None) => continue,
     239 UBC           0 :                     Err(e) => {
     240                 :                         // these should not happen, but we cannot make them statically impossible right
     241                 :                         // now.
     242               0 :                         tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
     243               0 :                         continue;
     244                 :                     }
     245                 :                 };
     246                 : 
     247 CBC        1277 :                 let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
     248 UBC           0 :                     // We only use this fallback if there's an implementation error.
     249               0 :                     // `latest_activity` already does rate-limited warn!() log.
     250               0 :                     debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
     251               0 :                     SystemTime::now()
     252 CBC        1277 :                 });
     253                 : 
     254            1277 :                 let no_activity_for = match now.duration_since(last_activity_ts) {
     255            1037 :                     Ok(d) => d,
     256             240 :                     Err(_e) => {
     257             240 :                         // We reach here if `now` < `last_activity_ts`, which can legitimately
     258             240 :                         // happen if there is an access between us getting `now`, and us getting
     259             240 :                         // the access stats from the layer.
     260             240 :                         //
     261             240 :                         // The other reason why it can happen is system clock skew because
     262             240 :                         // SystemTime::now() is not monotonic, so, even if there is no access
     263             240 :                         // to the layer after we get `now` at the beginning of this function,
     264             240 :                         // it could be that `now`  < `last_activity_ts`.
     265             240 :                         //
     266             240 :                         // To distinguish the cases, we would need to record `Instant`s in the
     267             240 :                         // access stats (i.e., monotonic timestamps), but then, the timestamps
     268             240 :                         // values in the access stats would need to be `Instant`'s, and hence
     269             240 :                         // they would be meaningless outside of the pageserver process.
     270             240 :                         // At the time of writing, the trade-off is that access stats are more
     271             240 :                         // valuable than detecting clock skew.
     272             240 :                         continue;
     273                 :                     }
     274                 :                 };
     275            1037 :                 let layer = guard.drop_eviction_guard();
     276            1037 :                 if no_activity_for > p.threshold {
     277              38 :                     let remote_client = remote_client.clone();
     278              38 :                     // this could cause a lot of allocations in some cases
     279              38 :                     js.spawn(async move { layer.evict_and_wait(&remote_client).await });
     280              38 :                     stats.candidates += 1;
     281             999 :                 }
     282                 :             }
     283                 :         };
     284                 : 
     285              36 :         let join_all = async move {
     286              74 :             while let Some(next) = js.join_next().await {
     287 UBC           0 :                 match next {
     288 CBC          38 :                     Ok(Ok(())) => stats.evicted += 1,
     289 UBC           0 :                     Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
     290               0 :                         stats.not_evictable += 1;
     291               0 :                     }
     292               0 :                     Err(je) if je.is_cancelled() => unreachable!("not used"),
     293               0 :                     Err(je) if je.is_panic() => {
     294               0 :                         /* already logged */
     295               0 :                         stats.errors += 1;
     296               0 :                     }
     297               0 :                     Err(je) => tracing::error!("unknown JoinError: {je:?}"),
     298                 :                 }
     299                 :             }
     300 CBC          36 :             stats
     301              36 :         };
     302                 : 
     303              36 :         tokio::select! {
     304              36 :             stats = join_all => {
     305                 :                 if stats.candidates == stats.not_evictable {
     306 UBC           0 :                     debug!(stats=?stats, "eviction iteration complete");
     307                 :                 } else if stats.errors > 0 || stats.not_evictable > 0 {
     308               0 :                     warn!(stats=?stats, "eviction iteration complete");
     309                 :                 } else {
     310 CBC           2 :                     info!(stats=?stats, "eviction iteration complete");
     311                 :                 }
     312                 :             }
     313                 :             _ = cancel.cancelled() => {
     314                 :                 // just drop the joinset to "abort"
     315                 :             }
     316                 :         }
     317                 : 
     318              36 :         ControlFlow::Continue(())
     319              36 :     }
     320                 : 
     321              36 :     #[instrument(skip_all)]
     322                 :     async fn imitate_layer_accesses(
     323                 :         &self,
     324                 :         p: &EvictionPolicyLayerAccessThreshold,
     325                 :         cancel: &CancellationToken,
     326                 :         ctx: &RequestContext,
     327                 :     ) -> ControlFlow<()> {
     328                 :         let mut state = self.eviction_task_timeline_state.lock().await;
     329                 : 
     330                 :         // Only do the imitate_layer accesses approximately as often as the threshold.  A little
     331                 :         // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
     332                 :         let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
     333                 : 
     334                 :         match state.last_layer_access_imitation {
     335                 :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     336                 :             _ => {
     337                 :                 self.imitate_timeline_cached_layer_accesses(ctx).await;
     338                 :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now())
     339                 :             }
     340                 :         }
     341                 :         drop(state);
     342                 : 
     343                 :         if cancel.is_cancelled() {
     344                 :             return ControlFlow::Break(());
     345                 :         }
     346                 : 
     347                 :         // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
     348                 :         // Make one of the tenant's timelines draw the short straw and run the calculation.
     349                 :         // The others wait until the calculation is done so that they take into account the
     350                 :         // imitated accesses that the winner made.
     351                 :         let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
     352                 :             Ok(t) => t,
     353                 :             Err(_) => {
     354                 :                 return ControlFlow::Break(());
     355                 :             }
     356                 :         };
     357                 :         let mut state = tenant.eviction_task_tenant_state.lock().await;
     358                 :         match state.last_layer_access_imitation {
     359                 :             Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
     360                 :             _ => {
     361                 :                 self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
     362                 :                     .await;
     363                 :                 state.last_layer_access_imitation = Some(tokio::time::Instant::now());
     364                 :             }
     365                 :         }
     366                 :         drop(state);
     367                 : 
     368                 :         if cancel.is_cancelled() {
     369                 :             return ControlFlow::Break(());
     370                 :         }
     371                 : 
     372                 :         ControlFlow::Continue(())
     373                 :     }
     374                 : 
     375                 :     /// Recompute the values which would cause on-demand downloads during restart.
     376 UBC           0 :     #[instrument(skip_all)]
     377                 :     async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
     378                 :         let lsn = self.get_last_record_lsn();
     379                 : 
     380                 :         // imitiate on-restart initial logical size
     381                 :         let size = self
     382                 :             .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
     383                 :             .instrument(info_span!("calculate_logical_size"))
     384                 :             .await;
     385                 : 
     386                 :         match &size {
     387                 :             Ok(_size) => {
     388                 :                 // good, don't log it to avoid confusion
     389                 :             }
     390                 :             Err(_) => {
     391                 :                 // we have known issues for which we already log this on consumption metrics,
     392                 :                 // gc, and compaction. leave logging out for now.
     393                 :                 //
     394                 :                 // https://github.com/neondatabase/neon/issues/2539
     395                 :             }
     396                 :         }
     397                 : 
     398                 :         // imitiate repartiting on first compactation
     399                 :         if let Err(e) = self
     400                 :             .collect_keyspace(lsn, ctx)
     401                 :             .instrument(info_span!("collect_keyspace"))
     402                 :             .await
     403                 :         {
     404                 :             // if this failed, we probably failed logical size because these use the same keys
     405                 :             if size.is_err() {
     406                 :                 // ignore, see above comment
     407                 :             } else {
     408                 :                 match e {
     409                 :                     CollectKeySpaceError::Cancelled => {
     410                 :                         // Shutting down, ignore
     411                 :                     }
     412                 :                     err => {
     413               0 :                         warn!(
     414               0 :                             "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
     415               0 :                         );
     416                 :                     }
     417                 :                 }
     418                 :             }
     419                 :         }
     420                 :     }
     421                 : 
     422                 :     // Imitate the synthetic size calculation done by the consumption_metrics module.
     423 CBC           8 :     #[instrument(skip_all)]
     424                 :     async fn imitate_synthetic_size_calculation_worker(
     425                 :         &self,
     426                 :         tenant: &Arc<Tenant>,
     427                 :         cancel: &CancellationToken,
     428                 :         ctx: &RequestContext,
     429                 :     ) {
     430                 :         if self.conf.metric_collection_endpoint.is_none() {
     431                 :             // We don't start the consumption metrics task if this is not set in the config.
     432                 :             // So, no need to imitate the accesses in that case.
     433                 :             return;
     434                 :         }
     435                 : 
     436                 :         // The consumption metrics are collected on a per-tenant basis, by a single
     437                 :         // global background loop.
     438                 :         // It limits the number of synthetic size calculations using the global
     439                 :         // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
     440                 :         // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
     441                 :         //
     442                 :         // If we used that same semaphore here, then we'd compete for the
     443                 :         // same permits, which may impact timeliness of consumption metrics.
     444                 :         // That is a no-go, as consumption metrics are much more important
     445                 :         // than what we do here.
     446                 :         //
     447                 :         // So, we have a separate semaphore, initialized to the same
     448                 :         // number of permits as the `concurrent_tenant_size_logical_size_queries`.
     449                 :         // In the worst, we would have twice the amount of concurrenct size calculations.
     450                 :         // But in practice, the `p.threshold` >> `consumption metric interval`, and
     451                 :         // we spread out the eviction task using `random_init_delay`.
     452                 :         // So, the chance of the worst case is quite low in practice.
     453                 :         // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
     454                 :         // So, we must coordinate with other with other eviction tasks of this tenant.
     455                 :         let limit = self
     456                 :             .conf
     457                 :             .eviction_task_immitated_concurrent_logical_size_queries
     458                 :             .inner();
     459                 : 
     460                 :         let mut throwaway_cache = HashMap::new();
     461                 :         let gather = crate::tenant::size::gather_inputs(
     462                 :             tenant,
     463                 :             limit,
     464                 :             None,
     465                 :             &mut throwaway_cache,
     466                 :             LogicalSizeCalculationCause::EvictionTaskImitation,
     467                 :             cancel,
     468                 :             ctx,
     469                 :         )
     470                 :         .instrument(info_span!("gather_inputs"));
     471                 : 
     472              16 :         tokio::select! {
     473                 :             _ = cancel.cancelled() => {}
     474               8 :             gather_result = gather => {
     475                 :                 match gather_result {
     476                 :                     Ok(_) => {},
     477                 :                     Err(e) => {
     478                 :                         // We don't care about the result, but, if it failed, we should log it,
     479                 :                         // since consumption metric might be hitting the cached value and
     480                 :                         // thus not encountering this error.
     481 UBC           0 :                         warn!("failed to imitate synthetic size calculation accesses: {e:#}")
     482                 :                     }
     483                 :                 }
     484                 :            }
     485                 :         }
     486                 :     }
     487                 : }
        

Generated by: LCOV version 2.1-beta