Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::collections::HashMap;
17 : use std::ops::ControlFlow;
18 : use std::sync::Arc;
19 : use std::time::{Duration, SystemTime};
20 :
21 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{Instrument, debug, info, info_span, instrument, warn};
25 : use utils::completion;
26 : use utils::sync::gate::GateGuard;
27 :
28 : use super::Timeline;
29 : use crate::context::{DownloadBehavior, RequestContext};
30 : use crate::pgdatadir_mapping::CollectKeySpaceError;
31 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
32 : use crate::tenant::size::CalculateSyntheticSizeError;
33 : use crate::tenant::storage_layer::LayerVisibilityHint;
34 : use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
35 : use crate::tenant::timeline::EvictionError;
36 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
37 : use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
38 :
39 : #[derive(Default)]
40 : pub struct EvictionTaskTimelineState {
41 : last_layer_access_imitation: Option<tokio::time::Instant>,
42 : }
43 :
44 : #[derive(Default)]
45 : pub struct EvictionTaskTenantState {
46 : last_layer_access_imitation: Option<Instant>,
47 : }
48 :
49 : impl Timeline {
50 0 : pub(super) fn launch_eviction_task(
51 0 : self: &Arc<Self>,
52 0 : parent: Arc<TenantShard>,
53 0 : background_tasks_can_start: Option<&completion::Barrier>,
54 0 : ) {
55 0 : let self_clone = Arc::clone(self);
56 0 : let background_tasks_can_start = background_tasks_can_start.cloned();
57 0 : task_mgr::spawn(
58 0 : BACKGROUND_RUNTIME.handle(),
59 0 : TaskKind::Eviction,
60 0 : self.tenant_shard_id,
61 0 : Some(self.timeline_id),
62 0 : &format!(
63 0 : "layer eviction for {}/{}",
64 0 : self.tenant_shard_id, self.timeline_id
65 0 : ),
66 0 : async move {
67 0 : tokio::select! {
68 0 : _ = self_clone.cancel.cancelled() => { return Ok(()); }
69 0 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
70 : };
71 :
72 0 : self_clone.eviction_task(parent).await;
73 0 : Ok(())
74 0 : },
75 : );
76 0 : }
77 :
78 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
79 : async fn eviction_task(self: Arc<Self>, tenant: Arc<TenantShard>) {
80 : // acquire the gate guard only once within a useful span
81 : let Ok(guard) = self.gate.enter() else {
82 : return;
83 : };
84 :
85 : {
86 : let policy = self.get_eviction_policy();
87 : let period = match policy {
88 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
89 : EvictionPolicy::OnlyImitiate(lat) => lat.period,
90 : EvictionPolicy::NoEviction => Duration::from_secs(10),
91 : };
92 : if sleep_random(period, &self.cancel).await.is_err() {
93 : return;
94 : }
95 : }
96 :
97 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn)
98 : .with_scope_timeline(&self);
99 : loop {
100 : let policy = self.get_eviction_policy();
101 : let cf = self
102 : .eviction_iteration(&tenant, &policy, &self.cancel, &guard, &ctx)
103 : .await;
104 :
105 : match cf {
106 : ControlFlow::Break(()) => break,
107 : ControlFlow::Continue(sleep_until) => {
108 : if tokio::time::timeout_at(sleep_until, self.cancel.cancelled())
109 : .await
110 : .is_ok()
111 : {
112 : break;
113 : }
114 : }
115 : }
116 : }
117 : }
118 :
119 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
120 : async fn eviction_iteration(
121 : self: &Arc<Self>,
122 : tenant: &TenantShard,
123 : policy: &EvictionPolicy,
124 : cancel: &CancellationToken,
125 : gate: &GateGuard,
126 : ctx: &RequestContext,
127 : ) -> ControlFlow<(), Instant> {
128 : debug!("eviction iteration: {policy:?}");
129 : let start = Instant::now();
130 : let (period, threshold) = match policy {
131 : EvictionPolicy::NoEviction => {
132 : // check again in 10 seconds; XXX config watch mechanism
133 : return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
134 : }
135 : EvictionPolicy::LayerAccessThreshold(p) => {
136 : match self
137 : .eviction_iteration_threshold(tenant, p, cancel, gate, ctx)
138 : .await
139 : {
140 : ControlFlow::Break(()) => return ControlFlow::Break(()),
141 : ControlFlow::Continue(()) => (),
142 : }
143 : (p.period, p.threshold)
144 : }
145 : EvictionPolicy::OnlyImitiate(p) => {
146 : if self
147 : .imitiate_only(tenant, p, cancel, gate, ctx)
148 : .await
149 : .is_break()
150 : {
151 : return ControlFlow::Break(());
152 : }
153 : (p.period, p.threshold)
154 : }
155 : };
156 :
157 : let elapsed = start.elapsed();
158 : crate::tenant::tasks::warn_when_period_overrun(
159 : elapsed,
160 : period,
161 : BackgroundLoopKind::Eviction,
162 : );
163 : // FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
164 : // don't think that is a relevant fear however, and regardless the imitation should be the
165 : // most costly part.
166 : crate::metrics::EVICTION_ITERATION_DURATION
167 : .get_metric_with_label_values(&[
168 : &format!("{}", period.as_secs()),
169 : &format!("{}", threshold.as_secs()),
170 : ])
171 : .unwrap()
172 : .observe(elapsed.as_secs_f64());
173 :
174 : ControlFlow::Continue(start + period)
175 : }
176 :
177 0 : async fn eviction_iteration_threshold(
178 0 : self: &Arc<Self>,
179 0 : tenant: &TenantShard,
180 0 : p: &EvictionPolicyLayerAccessThreshold,
181 0 : cancel: &CancellationToken,
182 0 : gate: &GateGuard,
183 0 : ctx: &RequestContext,
184 0 : ) -> ControlFlow<()> {
185 0 : let now = SystemTime::now();
186 :
187 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
188 :
189 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
190 0 : .await?;
191 :
192 : #[derive(Debug, Default)]
193 : struct EvictionStats {
194 : candidates: usize,
195 : evicted: usize,
196 : errors: usize,
197 : not_evictable: usize,
198 : timeouts: usize,
199 : #[allow(dead_code)]
200 : skipped_for_shutdown: usize,
201 : }
202 :
203 0 : let mut stats = EvictionStats::default();
204 : // Gather layers for eviction.
205 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
206 : // We don't want to hold the layer map lock during eviction.
207 :
208 : // So, we just need to deal with this.
209 :
210 0 : let mut js = tokio::task::JoinSet::new();
211 : {
212 0 : let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
213 :
214 0 : guard
215 0 : .likely_resident_layers()
216 0 : .filter(|layer| {
217 0 : let last_activity_ts = layer.latest_activity();
218 :
219 0 : let no_activity_for = match now.duration_since(last_activity_ts) {
220 0 : Ok(d) => d,
221 0 : Err(_e) => {
222 : // We reach here if `now` < `last_activity_ts`, which can legitimately
223 : // happen if there is an access between us getting `now`, and us getting
224 : // the access stats from the layer.
225 : //
226 : // The other reason why it can happen is system clock skew because
227 : // SystemTime::now() is not monotonic, so, even if there is no access
228 : // to the layer after we get `now` at the beginning of this function,
229 : // it could be that `now` < `last_activity_ts`.
230 : //
231 : // To distinguish the cases, we would need to record `Instant`s in the
232 : // access stats (i.e., monotonic timestamps), but then, the timestamps
233 : // values in the access stats would need to be `Instant`'s, and hence
234 : // they would be meaningless outside of the pageserver process.
235 : // At the time of writing, the trade-off is that access stats are more
236 : // valuable than detecting clock skew.
237 0 : return false;
238 : }
239 : };
240 :
241 0 : match layer.visibility() {
242 : LayerVisibilityHint::Visible => {
243 : // Usual case: a visible layer might be read any time, and we will keep it
244 : // resident until it hits our configured TTL threshold.
245 0 : no_activity_for > p.threshold
246 : }
247 : LayerVisibilityHint::Covered => {
248 : // Covered layers: this is probably a layer that was recently covered by
249 : // an image layer during compaction. We don't evict it immediately, but
250 : // it doesn't stay resident for the full `threshold`: we just keep it
251 : // for a shorter time in case
252 : // - it is used for Timestamp->LSN lookups
253 : // - a new branch is created in recent history which will read this layer
254 0 : no_activity_for > p.period
255 : }
256 : }
257 0 : })
258 0 : .cloned()
259 0 : .for_each(|layer| {
260 0 : js.spawn(async move {
261 0 : layer
262 0 : .evict_and_wait(std::time::Duration::from_secs(5))
263 0 : .await
264 0 : });
265 0 : stats.candidates += 1;
266 0 : });
267 : };
268 :
269 0 : let join_all = async move {
270 0 : while let Some(next) = js.join_next().await {
271 0 : match next {
272 0 : Ok(Ok(())) => stats.evicted += 1,
273 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
274 0 : stats.not_evictable += 1;
275 0 : }
276 0 : Ok(Err(EvictionError::Timeout)) => {
277 0 : stats.timeouts += 1;
278 0 : }
279 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
280 0 : Err(je) if je.is_panic() => {
281 0 : /* already logged */
282 0 : stats.errors += 1;
283 0 : }
284 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
285 : }
286 : }
287 0 : stats
288 0 : };
289 :
290 0 : tokio::select! {
291 0 : stats = join_all => {
292 0 : if stats.candidates == stats.not_evictable {
293 0 : debug!(stats=?stats, "eviction iteration complete");
294 0 : } else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
295 : // reminder: timeouts are not eviction cancellations
296 0 : warn!(stats=?stats, "eviction iteration complete");
297 : } else {
298 0 : info!(stats=?stats, "eviction iteration complete");
299 : }
300 : }
301 0 : _ = cancel.cancelled() => {
302 0 : // just drop the joinset to "abort"
303 0 : }
304 : }
305 :
306 0 : ControlFlow::Continue(())
307 0 : }
308 :
309 : /// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
310 : /// disk usage based eviction task.
311 0 : async fn imitiate_only(
312 0 : self: &Arc<Self>,
313 0 : tenant: &TenantShard,
314 0 : p: &EvictionPolicyLayerAccessThreshold,
315 0 : cancel: &CancellationToken,
316 0 : gate: &GateGuard,
317 0 : ctx: &RequestContext,
318 0 : ) -> ControlFlow<()> {
319 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
320 :
321 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
322 0 : .await
323 0 : }
324 :
325 0 : async fn acquire_imitation_permit(
326 0 : &self,
327 0 : cancel: &CancellationToken,
328 0 : ctx: &RequestContext,
329 0 : ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
330 0 : let acquire_permit =
331 0 : crate::tenant::tasks::acquire_concurrency_permit(BackgroundLoopKind::Eviction, ctx);
332 :
333 0 : tokio::select! {
334 0 : permit = acquire_permit => ControlFlow::Continue(permit),
335 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
336 0 : _ = self.cancel.cancelled() => ControlFlow::Break(()),
337 : }
338 0 : }
339 :
340 : /// If we evict layers but keep cached values derived from those layers, then
341 : /// we face a storm of on-demand downloads after pageserver restart.
342 : /// The reason is that the restart empties the caches, and so, the values
343 : /// need to be re-computed by accessing layers, which we evicted while the
344 : /// caches were filled.
345 : ///
346 : /// Solutions here would be one of the following:
347 : /// 1. Have a persistent cache.
348 : /// 2. Count every access to a cached value to the access stats of all layers
349 : /// that were accessed to compute the value in the first place.
350 : /// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
351 : /// get re-computed from layers, thereby counting towards layer access stats.
352 : /// 4. Make the eviction task imitate the layer accesses that typically hit caches.
353 : ///
354 : /// We follow approach (4) here because in Neon prod deployment:
355 : /// - page cache is quite small => high churn => low hit rate
356 : /// => eviction gets correct access stats
357 : /// - value-level caches such as logical size & repatition have a high hit rate,
358 : /// especially for inactive tenants
359 : /// => eviction sees zero accesses for these
360 : /// => they cause the on-demand download storm on pageserver restart
361 : ///
362 : /// We should probably move to persistent caches in the future, or avoid
363 : /// having inactive tenants attached to pageserver in the first place.
364 : #[instrument(skip_all)]
365 : async fn imitate_layer_accesses(
366 : &self,
367 : tenant: &TenantShard,
368 : p: &EvictionPolicyLayerAccessThreshold,
369 : cancel: &CancellationToken,
370 : gate: &GateGuard,
371 : permit: BackgroundLoopSemaphorePermit<'static>,
372 : ctx: &RequestContext,
373 : ) -> ControlFlow<()> {
374 : if !self.tenant_shard_id.is_shard_zero() {
375 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
376 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
377 : // skip imitating logical size accesses for eviction purposes.
378 : return ControlFlow::Continue(());
379 : }
380 :
381 : let mut state = self.eviction_task_timeline_state.lock().await;
382 :
383 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
384 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
385 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
386 :
387 : match state.last_layer_access_imitation {
388 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
389 : _ => {
390 : self.imitate_timeline_cached_layer_accesses(gate, ctx).await;
391 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
392 : }
393 : }
394 : drop(state);
395 :
396 : if cancel.is_cancelled() {
397 : return ControlFlow::Break(());
398 : }
399 :
400 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
401 : // Make one of the tenant's timelines draw the short straw and run the calculation.
402 : // The others wait until the calculation is done so that they take into account the
403 : // imitated accesses that the winner made.
404 : let (mut state, _permit) = {
405 : if let Ok(locked) = tenant.eviction_task_tenant_state.try_lock() {
406 : (locked, permit)
407 : } else {
408 : // we might need to wait for a long time here in case of pathological synthetic
409 : // size calculation performance
410 : drop(permit);
411 : let locked = tokio::select! {
412 : locked = tenant.eviction_task_tenant_state.lock() => locked,
413 : _ = self.cancel.cancelled() => {
414 : return ControlFlow::Break(())
415 : },
416 : _ = cancel.cancelled() => {
417 : return ControlFlow::Break(())
418 : }
419 : };
420 : // then reacquire -- this will be bad if there is a lot of traffic, but because we
421 : // released the permit, the overall latency will be much better.
422 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
423 : (locked, permit)
424 : }
425 : };
426 : match state.last_layer_access_imitation {
427 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
428 : _ => {
429 : self.imitate_synthetic_size_calculation_worker(tenant, cancel, ctx)
430 : .await;
431 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
432 : }
433 : }
434 : drop(state);
435 :
436 : if cancel.is_cancelled() {
437 : return ControlFlow::Break(());
438 : }
439 :
440 : ControlFlow::Continue(())
441 : }
442 :
443 : /// Recompute the values which would cause on-demand downloads during restart.
444 : #[instrument(skip_all)]
445 : async fn imitate_timeline_cached_layer_accesses(
446 : &self,
447 : guard: &GateGuard,
448 : ctx: &RequestContext,
449 : ) {
450 : let lsn = self.get_last_record_lsn();
451 :
452 : // imitiate on-restart initial logical size
453 : let size = self
454 : .calculate_logical_size(
455 : lsn,
456 : LogicalSizeCalculationCause::EvictionTaskImitation,
457 : guard,
458 : ctx,
459 : )
460 : .instrument(info_span!("calculate_logical_size"))
461 : .await;
462 :
463 : match &size {
464 : Ok(_size) => {
465 : // good, don't log it to avoid confusion
466 : }
467 : Err(_) => {
468 : // we have known issues for which we already log this on consumption metrics,
469 : // gc, and compaction. leave logging out for now.
470 : //
471 : // https://github.com/neondatabase/neon/issues/2539
472 : }
473 : }
474 :
475 : // imitiate repartiting on first compactation
476 : if let Err(e) = self
477 : .collect_keyspace(lsn, ctx)
478 : .instrument(info_span!("collect_keyspace"))
479 : .await
480 : {
481 : // if this failed, we probably failed logical size because these use the same keys
482 : if size.is_err() {
483 : // ignore, see above comment
484 : } else {
485 : match e {
486 : CollectKeySpaceError::Cancelled => {
487 : // Shutting down, ignore
488 : }
489 : err => {
490 : warn!(
491 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
492 : );
493 : }
494 : }
495 : }
496 : }
497 : }
498 :
499 : // Imitate the synthetic size calculation done by the consumption_metrics module.
500 : #[instrument(skip_all)]
501 : async fn imitate_synthetic_size_calculation_worker(
502 : &self,
503 : tenant: &TenantShard,
504 : cancel: &CancellationToken,
505 : ctx: &RequestContext,
506 : ) {
507 : if self.conf.metric_collection_endpoint.is_none() {
508 : // We don't start the consumption metrics task if this is not set in the config.
509 : // So, no need to imitate the accesses in that case.
510 : return;
511 : }
512 :
513 : // The consumption metrics are collected on a per-tenant basis, by a single
514 : // global background loop.
515 : // It limits the number of synthetic size calculations using the global
516 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
517 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
518 : //
519 : // If we used that same semaphore here, then we'd compete for the
520 : // same permits, which may impact timeliness of consumption metrics.
521 : // That is a no-go, as consumption metrics are much more important
522 : // than what we do here.
523 : //
524 : // So, we have a separate semaphore, initialized to the same
525 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
526 : // In the worst, we would have twice the amount of concurrenct size calculations.
527 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
528 : // we spread out the eviction task using `random_init_delay`.
529 : // So, the chance of the worst case is quite low in practice.
530 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
531 : // So, we must coordinate with other with other eviction tasks of this tenant.
532 : let limit = self
533 : .conf
534 : .eviction_task_immitated_concurrent_logical_size_queries
535 : .inner();
536 :
537 : let mut throwaway_cache = HashMap::new();
538 : let gather = crate::tenant::size::gather_inputs(
539 : tenant,
540 : limit,
541 : None,
542 : &mut throwaway_cache,
543 : LogicalSizeCalculationCause::EvictionTaskImitation,
544 : cancel,
545 : ctx,
546 : )
547 : .instrument(info_span!("gather_inputs"));
548 :
549 : tokio::select! {
550 : _ = cancel.cancelled() => {}
551 : gather_result = gather => {
552 : match gather_result {
553 : Ok(_) => {},
554 : // It can happen sometimes that we hit this instead of the cancellation token firing above
555 : Err(CalculateSyntheticSizeError::Cancelled) => {}
556 : Err(e) => {
557 : // We don't care about the result, but, if it failed, we should log it,
558 : // since consumption metric might be hitting the cached value and
559 : // thus not encountering this error.
560 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
561 : }
562 : }
563 : }
564 : }
565 : }
566 : }
|