Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
24 : use tokio::time::Instant;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, info, info_span, instrument, warn, Instrument};
27 :
28 : use crate::{
29 : context::{DownloadBehavior, RequestContext},
30 : pgdatadir_mapping::CollectKeySpaceError,
31 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
32 : tenant::{
33 : size::CalculateSyntheticSizeError,
34 : storage_layer::LayerVisibilityHint,
35 : tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit},
36 : timeline::EvictionError,
37 : LogicalSizeCalculationCause, Tenant,
38 : },
39 : };
40 :
41 : use utils::{completion, sync::gate::GateGuard};
42 :
43 : use super::Timeline;
44 :
45 : #[derive(Default)]
46 : pub struct EvictionTaskTimelineState {
47 : last_layer_access_imitation: Option<tokio::time::Instant>,
48 : }
49 :
50 : #[derive(Default)]
51 : pub struct EvictionTaskTenantState {
52 : last_layer_access_imitation: Option<Instant>,
53 : }
54 :
55 : impl Timeline {
56 0 : pub(super) fn launch_eviction_task(
57 0 : self: &Arc<Self>,
58 0 : parent: Arc<Tenant>,
59 0 : background_tasks_can_start: Option<&completion::Barrier>,
60 0 : ) {
61 0 : let self_clone = Arc::clone(self);
62 0 : let background_tasks_can_start = background_tasks_can_start.cloned();
63 0 : task_mgr::spawn(
64 0 : BACKGROUND_RUNTIME.handle(),
65 0 : TaskKind::Eviction,
66 0 : self.tenant_shard_id,
67 0 : Some(self.timeline_id),
68 0 : &format!(
69 0 : "layer eviction for {}/{}",
70 0 : self.tenant_shard_id, self.timeline_id
71 0 : ),
72 0 : async move {
73 0 : tokio::select! {
74 0 : _ = self_clone.cancel.cancelled() => { return Ok(()); }
75 0 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
76 0 : };
77 0 :
78 0 : self_clone.eviction_task(parent).await;
79 0 : Ok(())
80 0 : },
81 0 : );
82 0 : }
83 :
84 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
85 : async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
86 : // acquire the gate guard only once within a useful span
87 : let Ok(guard) = self.gate.enter() else {
88 : return;
89 : };
90 :
91 : {
92 : let policy = self.get_eviction_policy();
93 : let period = match policy {
94 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
95 : EvictionPolicy::OnlyImitiate(lat) => lat.period,
96 : EvictionPolicy::NoEviction => Duration::from_secs(10),
97 : };
98 : if sleep_random(period, &self.cancel).await.is_err() {
99 : return;
100 : }
101 : }
102 :
103 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
104 : loop {
105 : let policy = self.get_eviction_policy();
106 : let cf = self
107 : .eviction_iteration(&tenant, &policy, &self.cancel, &guard, &ctx)
108 : .await;
109 :
110 : match cf {
111 : ControlFlow::Break(()) => break,
112 : ControlFlow::Continue(sleep_until) => {
113 : if tokio::time::timeout_at(sleep_until, self.cancel.cancelled())
114 : .await
115 : .is_ok()
116 : {
117 : break;
118 : }
119 : }
120 : }
121 : }
122 : }
123 :
124 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
125 : async fn eviction_iteration(
126 : self: &Arc<Self>,
127 : tenant: &Tenant,
128 : policy: &EvictionPolicy,
129 : cancel: &CancellationToken,
130 : gate: &GateGuard,
131 : ctx: &RequestContext,
132 : ) -> ControlFlow<(), Instant> {
133 : debug!("eviction iteration: {policy:?}");
134 : let start = Instant::now();
135 : let (period, threshold) = match policy {
136 : EvictionPolicy::NoEviction => {
137 : // check again in 10 seconds; XXX config watch mechanism
138 : return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
139 : }
140 : EvictionPolicy::LayerAccessThreshold(p) => {
141 : match self
142 : .eviction_iteration_threshold(tenant, p, cancel, gate, ctx)
143 : .await
144 : {
145 : ControlFlow::Break(()) => return ControlFlow::Break(()),
146 : ControlFlow::Continue(()) => (),
147 : }
148 : (p.period, p.threshold)
149 : }
150 : EvictionPolicy::OnlyImitiate(p) => {
151 : if self
152 : .imitiate_only(tenant, p, cancel, gate, ctx)
153 : .await
154 : .is_break()
155 : {
156 : return ControlFlow::Break(());
157 : }
158 : (p.period, p.threshold)
159 : }
160 : };
161 :
162 : let elapsed = start.elapsed();
163 : crate::tenant::tasks::warn_when_period_overrun(
164 : elapsed,
165 : period,
166 : BackgroundLoopKind::Eviction,
167 : );
168 : // FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
169 : // don't think that is a relevant fear however, and regardless the imitation should be the
170 : // most costly part.
171 : crate::metrics::EVICTION_ITERATION_DURATION
172 : .get_metric_with_label_values(&[
173 : &format!("{}", period.as_secs()),
174 : &format!("{}", threshold.as_secs()),
175 : ])
176 : .unwrap()
177 : .observe(elapsed.as_secs_f64());
178 :
179 : ControlFlow::Continue(start + period)
180 : }
181 :
182 0 : async fn eviction_iteration_threshold(
183 0 : self: &Arc<Self>,
184 0 : tenant: &Tenant,
185 0 : p: &EvictionPolicyLayerAccessThreshold,
186 0 : cancel: &CancellationToken,
187 0 : gate: &GateGuard,
188 0 : ctx: &RequestContext,
189 0 : ) -> ControlFlow<()> {
190 0 : let now = SystemTime::now();
191 :
192 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
193 :
194 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
195 0 : .await?;
196 :
197 : #[derive(Debug, Default)]
198 : struct EvictionStats {
199 : candidates: usize,
200 : evicted: usize,
201 : errors: usize,
202 : not_evictable: usize,
203 : timeouts: usize,
204 : #[allow(dead_code)]
205 : skipped_for_shutdown: usize,
206 : }
207 :
208 0 : let mut stats = EvictionStats::default();
209 0 : // Gather layers for eviction.
210 0 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
211 0 : // We don't want to hold the layer map lock during eviction.
212 0 :
213 0 : // So, we just need to deal with this.
214 0 :
215 0 : let mut js = tokio::task::JoinSet::new();
216 : {
217 0 : let guard = self.layers.read().await;
218 :
219 0 : guard
220 0 : .likely_resident_layers()
221 0 : .filter(|layer| {
222 0 : let last_activity_ts = layer.latest_activity();
223 :
224 0 : let no_activity_for = match now.duration_since(last_activity_ts) {
225 0 : Ok(d) => d,
226 0 : Err(_e) => {
227 0 : // We reach here if `now` < `last_activity_ts`, which can legitimately
228 0 : // happen if there is an access between us getting `now`, and us getting
229 0 : // the access stats from the layer.
230 0 : //
231 0 : // The other reason why it can happen is system clock skew because
232 0 : // SystemTime::now() is not monotonic, so, even if there is no access
233 0 : // to the layer after we get `now` at the beginning of this function,
234 0 : // it could be that `now` < `last_activity_ts`.
235 0 : //
236 0 : // To distinguish the cases, we would need to record `Instant`s in the
237 0 : // access stats (i.e., monotonic timestamps), but then, the timestamps
238 0 : // values in the access stats would need to be `Instant`'s, and hence
239 0 : // they would be meaningless outside of the pageserver process.
240 0 : // At the time of writing, the trade-off is that access stats are more
241 0 : // valuable than detecting clock skew.
242 0 : return false;
243 : }
244 : };
245 :
246 0 : match layer.visibility() {
247 : LayerVisibilityHint::Visible => {
248 : // Usual case: a visible layer might be read any time, and we will keep it
249 : // resident until it hits our configured TTL threshold.
250 0 : no_activity_for > p.threshold
251 : }
252 : LayerVisibilityHint::Covered => {
253 : // Covered layers: this is probably a layer that was recently covered by
254 : // an image layer during compaction. We don't evict it immediately, but
255 : // it doesn't stay resident for the full `threshold`: we just keep it
256 : // for a shorter time in case
257 : // - it is used for Timestamp->LSN lookups
258 : // - a new branch is created in recent history which will read this layer
259 0 : no_activity_for > p.period
260 : }
261 : }
262 0 : })
263 0 : .cloned()
264 0 : .for_each(|layer| {
265 0 : js.spawn(async move {
266 0 : layer
267 0 : .evict_and_wait(std::time::Duration::from_secs(5))
268 0 : .await
269 0 : });
270 0 : stats.candidates += 1;
271 0 : });
272 0 : };
273 0 :
274 0 : let join_all = async move {
275 0 : while let Some(next) = js.join_next().await {
276 0 : match next {
277 0 : Ok(Ok(())) => stats.evicted += 1,
278 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
279 0 : stats.not_evictable += 1;
280 0 : }
281 0 : Ok(Err(EvictionError::Timeout)) => {
282 0 : stats.timeouts += 1;
283 0 : }
284 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
285 0 : Err(je) if je.is_panic() => {
286 0 : /* already logged */
287 0 : stats.errors += 1;
288 0 : }
289 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
290 : }
291 : }
292 0 : stats
293 0 : };
294 :
295 0 : tokio::select! {
296 0 : stats = join_all => {
297 0 : if stats.candidates == stats.not_evictable {
298 0 : debug!(stats=?stats, "eviction iteration complete");
299 0 : } else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
300 : // reminder: timeouts are not eviction cancellations
301 0 : warn!(stats=?stats, "eviction iteration complete");
302 : } else {
303 0 : info!(stats=?stats, "eviction iteration complete");
304 : }
305 : }
306 0 : _ = cancel.cancelled() => {
307 0 : // just drop the joinset to "abort"
308 0 : }
309 : }
310 :
311 0 : ControlFlow::Continue(())
312 0 : }
313 :
314 : /// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
315 : /// disk usage based eviction task.
316 0 : async fn imitiate_only(
317 0 : self: &Arc<Self>,
318 0 : tenant: &Tenant,
319 0 : p: &EvictionPolicyLayerAccessThreshold,
320 0 : cancel: &CancellationToken,
321 0 : gate: &GateGuard,
322 0 : ctx: &RequestContext,
323 0 : ) -> ControlFlow<()> {
324 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
325 :
326 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
327 0 : .await
328 0 : }
329 :
330 0 : async fn acquire_imitation_permit(
331 0 : &self,
332 0 : cancel: &CancellationToken,
333 0 : ctx: &RequestContext,
334 0 : ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
335 0 : let acquire_permit =
336 0 : crate::tenant::tasks::acquire_concurrency_permit(BackgroundLoopKind::Eviction, ctx);
337 0 :
338 0 : tokio::select! {
339 0 : permit = acquire_permit => ControlFlow::Continue(permit),
340 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
341 0 : _ = self.cancel.cancelled() => ControlFlow::Break(()),
342 : }
343 0 : }
344 :
345 : /// If we evict layers but keep cached values derived from those layers, then
346 : /// we face a storm of on-demand downloads after pageserver restart.
347 : /// The reason is that the restart empties the caches, and so, the values
348 : /// need to be re-computed by accessing layers, which we evicted while the
349 : /// caches were filled.
350 : ///
351 : /// Solutions here would be one of the following:
352 : /// 1. Have a persistent cache.
353 : /// 2. Count every access to a cached value to the access stats of all layers
354 : /// that were accessed to compute the value in the first place.
355 : /// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
356 : /// get re-computed from layers, thereby counting towards layer access stats.
357 : /// 4. Make the eviction task imitate the layer accesses that typically hit caches.
358 : ///
359 : /// We follow approach (4) here because in Neon prod deployment:
360 : /// - page cache is quite small => high churn => low hit rate
361 : /// => eviction gets correct access stats
362 : /// - value-level caches such as logical size & repatition have a high hit rate,
363 : /// especially for inactive tenants
364 : /// => eviction sees zero accesses for these
365 : /// => they cause the on-demand download storm on pageserver restart
366 : ///
367 : /// We should probably move to persistent caches in the future, or avoid
368 : /// having inactive tenants attached to pageserver in the first place.
369 : #[instrument(skip_all)]
370 : async fn imitate_layer_accesses(
371 : &self,
372 : tenant: &Tenant,
373 : p: &EvictionPolicyLayerAccessThreshold,
374 : cancel: &CancellationToken,
375 : gate: &GateGuard,
376 : permit: BackgroundLoopSemaphorePermit<'static>,
377 : ctx: &RequestContext,
378 : ) -> ControlFlow<()> {
379 : if !self.tenant_shard_id.is_shard_zero() {
380 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
381 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
382 : // skip imitating logical size accesses for eviction purposes.
383 : return ControlFlow::Continue(());
384 : }
385 :
386 : let mut state = self.eviction_task_timeline_state.lock().await;
387 :
388 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
389 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
390 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
391 :
392 : match state.last_layer_access_imitation {
393 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
394 : _ => {
395 : self.imitate_timeline_cached_layer_accesses(gate, ctx).await;
396 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
397 : }
398 : }
399 : drop(state);
400 :
401 : if cancel.is_cancelled() {
402 : return ControlFlow::Break(());
403 : }
404 :
405 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
406 : // Make one of the tenant's timelines draw the short straw and run the calculation.
407 : // The others wait until the calculation is done so that they take into account the
408 : // imitated accesses that the winner made.
409 : let (mut state, _permit) = {
410 : if let Ok(locked) = tenant.eviction_task_tenant_state.try_lock() {
411 : (locked, permit)
412 : } else {
413 : // we might need to wait for a long time here in case of pathological synthetic
414 : // size calculation performance
415 : drop(permit);
416 : let locked = tokio::select! {
417 : locked = tenant.eviction_task_tenant_state.lock() => locked,
418 : _ = self.cancel.cancelled() => {
419 : return ControlFlow::Break(())
420 : },
421 : _ = cancel.cancelled() => {
422 : return ControlFlow::Break(())
423 : }
424 : };
425 : // then reacquire -- this will be bad if there is a lot of traffic, but because we
426 : // released the permit, the overall latency will be much better.
427 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
428 : (locked, permit)
429 : }
430 : };
431 : match state.last_layer_access_imitation {
432 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
433 : _ => {
434 : self.imitate_synthetic_size_calculation_worker(tenant, cancel, ctx)
435 : .await;
436 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
437 : }
438 : }
439 : drop(state);
440 :
441 : if cancel.is_cancelled() {
442 : return ControlFlow::Break(());
443 : }
444 :
445 : ControlFlow::Continue(())
446 : }
447 :
448 : /// Recompute the values which would cause on-demand downloads during restart.
449 : #[instrument(skip_all)]
450 : async fn imitate_timeline_cached_layer_accesses(
451 : &self,
452 : guard: &GateGuard,
453 : ctx: &RequestContext,
454 : ) {
455 : let lsn = self.get_last_record_lsn();
456 :
457 : // imitiate on-restart initial logical size
458 : let size = self
459 : .calculate_logical_size(
460 : lsn,
461 : LogicalSizeCalculationCause::EvictionTaskImitation,
462 : guard,
463 : ctx,
464 : )
465 : .instrument(info_span!("calculate_logical_size"))
466 : .await;
467 :
468 : match &size {
469 : Ok(_size) => {
470 : // good, don't log it to avoid confusion
471 : }
472 : Err(_) => {
473 : // we have known issues for which we already log this on consumption metrics,
474 : // gc, and compaction. leave logging out for now.
475 : //
476 : // https://github.com/neondatabase/neon/issues/2539
477 : }
478 : }
479 :
480 : // imitiate repartiting on first compactation
481 : if let Err(e) = self
482 : .collect_keyspace(lsn, ctx)
483 : .instrument(info_span!("collect_keyspace"))
484 : .await
485 : {
486 : // if this failed, we probably failed logical size because these use the same keys
487 : if size.is_err() {
488 : // ignore, see above comment
489 : } else {
490 : match e {
491 : CollectKeySpaceError::Cancelled => {
492 : // Shutting down, ignore
493 : }
494 : err => {
495 : warn!(
496 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
497 : );
498 : }
499 : }
500 : }
501 : }
502 : }
503 :
504 : // Imitate the synthetic size calculation done by the consumption_metrics module.
505 : #[instrument(skip_all)]
506 : async fn imitate_synthetic_size_calculation_worker(
507 : &self,
508 : tenant: &Tenant,
509 : cancel: &CancellationToken,
510 : ctx: &RequestContext,
511 : ) {
512 : if self.conf.metric_collection_endpoint.is_none() {
513 : // We don't start the consumption metrics task if this is not set in the config.
514 : // So, no need to imitate the accesses in that case.
515 : return;
516 : }
517 :
518 : // The consumption metrics are collected on a per-tenant basis, by a single
519 : // global background loop.
520 : // It limits the number of synthetic size calculations using the global
521 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
522 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
523 : //
524 : // If we used that same semaphore here, then we'd compete for the
525 : // same permits, which may impact timeliness of consumption metrics.
526 : // That is a no-go, as consumption metrics are much more important
527 : // than what we do here.
528 : //
529 : // So, we have a separate semaphore, initialized to the same
530 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
531 : // In the worst, we would have twice the amount of concurrenct size calculations.
532 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
533 : // we spread out the eviction task using `random_init_delay`.
534 : // So, the chance of the worst case is quite low in practice.
535 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
536 : // So, we must coordinate with other with other eviction tasks of this tenant.
537 : let limit = self
538 : .conf
539 : .eviction_task_immitated_concurrent_logical_size_queries
540 : .inner();
541 :
542 : let mut throwaway_cache = HashMap::new();
543 : let gather = crate::tenant::size::gather_inputs(
544 : tenant,
545 : limit,
546 : None,
547 : &mut throwaway_cache,
548 : LogicalSizeCalculationCause::EvictionTaskImitation,
549 : cancel,
550 : ctx,
551 : )
552 : .instrument(info_span!("gather_inputs"));
553 :
554 : tokio::select! {
555 : _ = cancel.cancelled() => {}
556 : gather_result = gather => {
557 : match gather_result {
558 : Ok(_) => {},
559 : // It can happen sometimes that we hit this instead of the cancellation token firing above
560 : Err(CalculateSyntheticSizeError::Cancelled) => {}
561 : Err(e) => {
562 : // We don't care about the result, but, if it failed, we should log it,
563 : // since consumption metric might be hitting the cached value and
564 : // thus not encountering this error.
565 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
566 : }
567 : }
568 : }
569 : }
570 : }
571 : }
|