TLA Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use tokio::time::Instant;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
26 :
27 : use crate::{
28 : context::{DownloadBehavior, RequestContext},
29 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
30 : tenant::{
31 : config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
32 : storage_layer::PersistentLayer,
33 : tasks::{BackgroundLoopKind, RateLimitError},
34 : timeline::EvictionError,
35 : LogicalSizeCalculationCause, Tenant,
36 : },
37 : };
38 :
39 : use utils::completion;
40 :
41 : use super::Timeline;
42 :
43 CBC 1302 : #[derive(Default)]
44 : pub struct EvictionTaskTimelineState {
45 : last_layer_access_imitation: Option<tokio::time::Instant>,
46 : }
47 :
48 748 : #[derive(Default)]
49 : pub struct EvictionTaskTenantState {
50 : last_layer_access_imitation: Option<Instant>,
51 : }
52 :
53 : impl Timeline {
54 1103 : pub(super) fn launch_eviction_task(
55 1103 : self: &Arc<Self>,
56 1103 : background_tasks_can_start: Option<&completion::Barrier>,
57 1103 : ) {
58 1103 : let self_clone = Arc::clone(self);
59 1103 : let background_tasks_can_start = background_tasks_can_start.cloned();
60 1103 : task_mgr::spawn(
61 1103 : BACKGROUND_RUNTIME.handle(),
62 1103 : TaskKind::Eviction,
63 1103 : Some(self.tenant_id),
64 1103 : Some(self.timeline_id),
65 1103 : &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
66 1103 : false,
67 1103 : async move {
68 1103 : let cancel = task_mgr::shutdown_token();
69 1274 : tokio::select! {
70 1274 : _ = cancel.cancelled() => { return Ok(()); }
71 1274 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
72 1274 : };
73 :
74 4194 : self_clone.eviction_task(cancel).await;
75 405 : Ok(())
76 1103 : },
77 1103 : );
78 1103 : }
79 :
80 3081 : #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
81 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
82 : use crate::tenant::tasks::random_init_delay;
83 : {
84 : let policy = self.get_eviction_policy();
85 : let period = match policy {
86 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
87 : EvictionPolicy::NoEviction => Duration::from_secs(10),
88 : };
89 : if random_init_delay(period, &cancel).await.is_err() {
90 : return;
91 : }
92 : }
93 :
94 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
95 : loop {
96 : let policy = self.get_eviction_policy();
97 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
98 :
99 : match cf {
100 : ControlFlow::Break(()) => break,
101 : ControlFlow::Continue(sleep_until) => {
102 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
103 : .await
104 : .is_ok()
105 : {
106 : break;
107 : }
108 : }
109 : }
110 : }
111 : }
112 :
113 1862 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
114 : async fn eviction_iteration(
115 : self: &Arc<Self>,
116 : policy: &EvictionPolicy,
117 : cancel: &CancellationToken,
118 : ctx: &RequestContext,
119 : ) -> ControlFlow<(), Instant> {
120 UBC 0 : debug!("eviction iteration: {policy:?}");
121 : match policy {
122 : EvictionPolicy::NoEviction => {
123 : // check again in 10 seconds; XXX config watch mechanism
124 : ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
125 : }
126 : EvictionPolicy::LayerAccessThreshold(p) => {
127 : let start = Instant::now();
128 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
129 : ControlFlow::Break(()) => return ControlFlow::Break(()),
130 : ControlFlow::Continue(()) => (),
131 : }
132 : let elapsed = start.elapsed();
133 : crate::tenant::tasks::warn_when_period_overrun(
134 : elapsed,
135 : p.period,
136 : BackgroundLoopKind::Eviction,
137 : );
138 : crate::metrics::EVICTION_ITERATION_DURATION
139 : .get_metric_with_label_values(&[
140 : &format!("{}", p.period.as_secs()),
141 : &format!("{}", p.threshold.as_secs()),
142 : ])
143 : .unwrap()
144 : .observe(elapsed.as_secs_f64());
145 : ControlFlow::Continue(start + p.period)
146 : }
147 : }
148 : }
149 :
150 CBC 26 : async fn eviction_iteration_threshold(
151 26 : self: &Arc<Self>,
152 26 : p: &EvictionPolicyLayerAccessThreshold,
153 26 : cancel: &CancellationToken,
154 26 : ctx: &RequestContext,
155 26 : ) -> ControlFlow<()> {
156 26 : let now = SystemTime::now();
157 :
158 26 : let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(
159 26 : BackgroundLoopKind::Eviction,
160 26 : ctx,
161 26 : cancel,
162 26 : )
163 UBC 0 : .await
164 : {
165 CBC 26 : Ok(permit) => permit,
166 UBC 0 : Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
167 : };
168 :
169 : // If we evict layers but keep cached values derived from those layers, then
170 : // we face a storm of on-demand downloads after pageserver restart.
171 : // The reason is that the restart empties the caches, and so, the values
172 : // need to be re-computed by accessing layers, which we evicted while the
173 : // caches were filled.
174 : //
175 : // Solutions here would be one of the following:
176 : // 1. Have a persistent cache.
177 : // 2. Count every access to a cached value to the access stats of all layers
178 : // that were accessed to compute the value in the first place.
179 : // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
180 : // get re-computed from layers, thereby counting towards layer access stats.
181 : // 4. Make the eviction task imitate the layer accesses that typically hit caches.
182 : //
183 : // We follow approach (4) here because in Neon prod deployment:
184 : // - page cache is quite small => high churn => low hit rate
185 : // => eviction gets correct access stats
186 : // - value-level caches such as logical size & repatition have a high hit rate,
187 : // especially for inactive tenants
188 : // => eviction sees zero accesses for these
189 : // => they cause the on-demand download storm on pageserver restart
190 : //
191 : // We should probably move to persistent caches in the future, or avoid
192 : // having inactive tenants attached to pageserver in the first place.
193 CBC 2855 : match self.imitate_layer_accesses(p, cancel, ctx).await {
194 UBC 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
195 CBC 26 : ControlFlow::Continue(()) => (),
196 26 : }
197 26 :
198 26 : #[allow(dead_code)]
199 26 : #[derive(Debug, Default)]
200 26 : struct EvictionStats {
201 26 : candidates: usize,
202 26 : evicted: usize,
203 26 : errors: usize,
204 26 : not_evictable: usize,
205 26 : skipped_for_shutdown: usize,
206 26 : }
207 26 :
208 26 : let mut stats = EvictionStats::default();
209 : // Gather layers for eviction.
210 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
211 : // We don't want to hold the layer map lock during eviction.
212 : // So, we just need to deal with this.
213 26 : let candidates: Vec<Arc<dyn PersistentLayer>> = {
214 26 : let guard = self.layers.read().await;
215 26 : let layers = guard.layer_map();
216 26 : let mut candidates = Vec::new();
217 1565 : for hist_layer in layers.iter_historic_layers() {
218 1565 : let hist_layer = guard.get_from_desc(&hist_layer);
219 1565 : if hist_layer.is_remote_layer() {
220 570 : continue;
221 995 : }
222 995 :
223 995 : let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
224 UBC 0 : // We only use this fallback if there's an implementation error.
225 0 : // `latest_activity` already does rate-limited warn!() log.
226 0 : debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
227 0 : SystemTime::now()
228 CBC 995 : });
229 :
230 995 : let no_activity_for = match now.duration_since(last_activity_ts) {
231 813 : Ok(d) => d,
232 182 : Err(_e) => {
233 182 : // We reach here if `now` < `last_activity_ts`, which can legitimately
234 182 : // happen if there is an access between us getting `now`, and us getting
235 182 : // the access stats from the layer.
236 182 : //
237 182 : // The other reason why it can happen is system clock skew because
238 182 : // SystemTime::now() is not monotonic, so, even if there is no access
239 182 : // to the layer after we get `now` at the beginning of this function,
240 182 : // it could be that `now` < `last_activity_ts`.
241 182 : //
242 182 : // To distinguish the cases, we would need to record `Instant`s in the
243 182 : // access stats (i.e., monotonic timestamps), but then, the timestamps
244 182 : // values in the access stats would need to be `Instant`'s, and hence
245 182 : // they would be meaningless outside of the pageserver process.
246 182 : // At the time of writing, the trade-off is that access stats are more
247 182 : // valuable than detecting clock skew.
248 182 : continue;
249 : }
250 : };
251 813 : if no_activity_for > p.threshold {
252 38 : candidates.push(hist_layer)
253 775 : }
254 : }
255 26 : candidates
256 26 : };
257 26 : stats.candidates = candidates.len();
258 :
259 26 : let remote_client = match self.remote_client.as_ref() {
260 : None => {
261 UBC 0 : error!(
262 0 : num_candidates = candidates.len(),
263 0 : "no remote storage configured, cannot evict layers"
264 0 : );
265 0 : return ControlFlow::Continue(());
266 : }
267 CBC 26 : Some(c) => c,
268 : };
269 :
270 26 : let results = match self
271 26 : .evict_layer_batch(remote_client, &candidates[..], cancel.clone())
272 5 : .await
273 : {
274 UBC 0 : Err(pre_err) => {
275 0 : stats.errors += candidates.len();
276 0 : error!("could not do any evictions: {pre_err:#}");
277 0 : return ControlFlow::Continue(());
278 : }
279 CBC 26 : Ok(results) => results,
280 26 : };
281 26 : assert_eq!(results.len(), candidates.len());
282 38 : for (l, result) in candidates.iter().zip(results) {
283 UBC 0 : match result {
284 0 : None => {
285 0 : stats.skipped_for_shutdown += 1;
286 0 : }
287 CBC 38 : Some(Ok(())) => {
288 38 : stats.evicted += 1;
289 38 : }
290 UBC 0 : Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
291 0 : stats.not_evictable += 1;
292 0 : }
293 0 : Some(Err(EvictionError::FileNotFound)) => {
294 0 : // compaction/gc removed the file while we were waiting on layer_removal_cs
295 0 : stats.not_evictable += 1;
296 0 : }
297 : Some(Err(
298 0 : e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
299 : )) => {
300 0 : let e = utils::error::report_compact_sources(&e);
301 0 : warn!(layer = %l, "failed to evict layer: {e}");
302 0 : stats.not_evictable += 1;
303 : }
304 0 : Some(Err(EvictionError::MetadataInconsistency(detail))) => {
305 0 : warn!(layer = %l, "failed to evict layer: {detail}");
306 0 : stats.not_evictable += 1;
307 : }
308 : }
309 : }
310 CBC 26 : if stats.candidates == stats.not_evictable {
311 UBC 0 : debug!(stats=?stats, "eviction iteration complete");
312 CBC 1 : } else if stats.errors > 0 || stats.not_evictable > 0 {
313 UBC 0 : warn!(stats=?stats, "eviction iteration complete");
314 : } else {
315 CBC 1 : info!(stats=?stats, "eviction iteration complete");
316 : }
317 26 : ControlFlow::Continue(())
318 26 : }
319 :
320 104 : #[instrument(skip_all)]
321 : async fn imitate_layer_accesses(
322 : &self,
323 : p: &EvictionPolicyLayerAccessThreshold,
324 : cancel: &CancellationToken,
325 : ctx: &RequestContext,
326 : ) -> ControlFlow<()> {
327 : let mut state = self.eviction_task_timeline_state.lock().await;
328 :
329 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
330 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
331 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
332 :
333 : match state.last_layer_access_imitation {
334 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
335 : _ => {
336 : self.imitate_timeline_cached_layer_accesses(cancel, ctx)
337 : .await;
338 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
339 : }
340 : }
341 : drop(state);
342 :
343 : if cancel.is_cancelled() {
344 : return ControlFlow::Break(());
345 : }
346 :
347 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
348 : // Make one of the tenant's timelines draw the short straw and run the calculation.
349 : // The others wait until the calculation is done so that they take into account the
350 : // imitated accesses that the winner made.
351 : //
352 : // It is critical we are responsive to cancellation here. Otherwise, we deadlock with
353 : // tenant deletion (holds TENANTS in read mode) any other task that attempts to
354 : // acquire TENANTS in write mode before we here call get_tenant.
355 : // See https://github.com/neondatabase/neon/issues/5284.
356 26 : let res = tokio::select! {
357 : _ = cancel.cancelled() => {
358 : return ControlFlow::Break(());
359 : }
360 26 : res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => {
361 : res
362 : }
363 : };
364 : let tenant = match res {
365 : Ok(t) => t,
366 : Err(_) => {
367 : return ControlFlow::Break(());
368 : }
369 : };
370 : let mut state = tenant.eviction_task_tenant_state.lock().await;
371 : match state.last_layer_access_imitation {
372 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
373 : _ => {
374 : self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
375 : .await;
376 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
377 : }
378 : }
379 : drop(state);
380 :
381 : if cancel.is_cancelled() {
382 : return ControlFlow::Break(());
383 : }
384 :
385 : ControlFlow::Continue(())
386 : }
387 :
388 : /// Recompute the values which would cause on-demand downloads during restart.
389 18 : #[instrument(skip_all)]
390 : async fn imitate_timeline_cached_layer_accesses(
391 : &self,
392 : cancel: &CancellationToken,
393 : ctx: &RequestContext,
394 : ) {
395 : let lsn = self.get_last_record_lsn();
396 :
397 : // imitiate on-restart initial logical size
398 : let size = self
399 : .calculate_logical_size(
400 : lsn,
401 : LogicalSizeCalculationCause::EvictionTaskImitation,
402 : cancel.clone(),
403 : ctx,
404 : )
405 : .instrument(info_span!("calculate_logical_size"))
406 : .await;
407 :
408 : match &size {
409 : Ok(_size) => {
410 : // good, don't log it to avoid confusion
411 : }
412 : Err(_) => {
413 : // we have known issues for which we already log this on consumption metrics,
414 : // gc, and compaction. leave logging out for now.
415 : //
416 : // https://github.com/neondatabase/neon/issues/2539
417 : }
418 : }
419 :
420 : // imitiate repartiting on first compactation
421 : if let Err(e) = self
422 : .collect_keyspace(lsn, ctx)
423 : .instrument(info_span!("collect_keyspace"))
424 : .await
425 : {
426 : // if this failed, we probably failed logical size because these use the same keys
427 : if size.is_err() {
428 : // ignore, see above comment
429 : } else {
430 UBC 0 : warn!(
431 0 : "failed to collect keyspace but succeeded in calculating logical size: {e:#}"
432 0 : );
433 : }
434 : }
435 : }
436 :
437 : // Imitate the synthetic size calculation done by the consumption_metrics module.
438 CBC 24 : #[instrument(skip_all)]
439 : async fn imitate_synthetic_size_calculation_worker(
440 : &self,
441 : tenant: &Arc<Tenant>,
442 : ctx: &RequestContext,
443 : cancel: &CancellationToken,
444 : ) {
445 : if self.conf.metric_collection_endpoint.is_none() {
446 : // We don't start the consumption metrics task if this is not set in the config.
447 : // So, no need to imitate the accesses in that case.
448 : return;
449 : }
450 :
451 : // The consumption metrics are collected on a per-tenant basis, by a single
452 : // global background loop.
453 : // It limits the number of synthetic size calculations using the global
454 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
455 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
456 : //
457 : // If we used that same semaphore here, then we'd compete for the
458 : // same permits, which may impact timeliness of consumption metrics.
459 : // That is a no-go, as consumption metrics are much more important
460 : // than what we do here.
461 : //
462 : // So, we have a separate semaphore, initialized to the same
463 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
464 : // In the worst, we would have twice the amount of concurrenct size calculations.
465 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
466 : // we spread out the eviction task using `random_init_delay`.
467 : // So, the chance of the worst case is quite low in practice.
468 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
469 : // So, we must coordinate with other with other eviction tasks of this tenant.
470 : let limit = self
471 : .conf
472 : .eviction_task_immitated_concurrent_logical_size_queries
473 : .inner();
474 :
475 : let mut throwaway_cache = HashMap::new();
476 : let gather = crate::tenant::size::gather_inputs(
477 : tenant,
478 : limit,
479 : None,
480 : &mut throwaway_cache,
481 : LogicalSizeCalculationCause::EvictionTaskImitation,
482 : ctx,
483 : )
484 : .instrument(info_span!("gather_inputs"));
485 :
486 12 : tokio::select! {
487 : _ = cancel.cancelled() => {}
488 6 : gather_result = gather => {
489 : match gather_result {
490 : Ok(_) => {},
491 : Err(e) => {
492 : // We don't care about the result, but, if it failed, we should log it,
493 : // since consumption metric might be hitting the cached value and
494 : // thus not encountering this error.
495 UBC 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
496 : }
497 : }
498 : }
499 : }
500 : }
501 : }
|