Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::collections::HashMap;
17 : use std::ops::ControlFlow;
18 : use std::sync::Arc;
19 : use std::time::{Duration, SystemTime};
20 :
21 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{Instrument, debug, info, info_span, instrument, warn};
25 : use utils::completion;
26 : use utils::sync::gate::GateGuard;
27 :
28 : use super::Timeline;
29 : use crate::context::{DownloadBehavior, RequestContext};
30 : use crate::pgdatadir_mapping::CollectKeySpaceError;
31 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
32 : use crate::tenant::size::CalculateSyntheticSizeError;
33 : use crate::tenant::storage_layer::LayerVisibilityHint;
34 : use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
35 : use crate::tenant::timeline::EvictionError;
36 : use crate::tenant::{LogicalSizeCalculationCause, Tenant};
37 :
38 : #[derive(Default)]
39 : pub struct EvictionTaskTimelineState {
40 : last_layer_access_imitation: Option<tokio::time::Instant>,
41 : }
42 :
43 : #[derive(Default)]
44 : pub struct EvictionTaskTenantState {
45 : last_layer_access_imitation: Option<Instant>,
46 : }
47 :
48 : impl Timeline {
49 0 : pub(super) fn launch_eviction_task(
50 0 : self: &Arc<Self>,
51 0 : parent: Arc<Tenant>,
52 0 : background_tasks_can_start: Option<&completion::Barrier>,
53 0 : ) {
54 0 : let self_clone = Arc::clone(self);
55 0 : let background_tasks_can_start = background_tasks_can_start.cloned();
56 0 : task_mgr::spawn(
57 0 : BACKGROUND_RUNTIME.handle(),
58 0 : TaskKind::Eviction,
59 0 : self.tenant_shard_id,
60 0 : Some(self.timeline_id),
61 0 : &format!(
62 0 : "layer eviction for {}/{}",
63 0 : self.tenant_shard_id, self.timeline_id
64 0 : ),
65 0 : async move {
66 0 : tokio::select! {
67 0 : _ = self_clone.cancel.cancelled() => { return Ok(()); }
68 0 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
69 0 : };
70 0 :
71 0 : self_clone.eviction_task(parent).await;
72 0 : Ok(())
73 0 : },
74 0 : );
75 0 : }
76 :
77 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
78 : async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
79 : // acquire the gate guard only once within a useful span
80 : let Ok(guard) = self.gate.enter() else {
81 : return;
82 : };
83 :
84 : {
85 : let policy = self.get_eviction_policy();
86 : let period = match policy {
87 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
88 : EvictionPolicy::OnlyImitiate(lat) => lat.period,
89 : EvictionPolicy::NoEviction => Duration::from_secs(10),
90 : };
91 : if sleep_random(period, &self.cancel).await.is_err() {
92 : return;
93 : }
94 : }
95 :
96 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
97 : loop {
98 : let policy = self.get_eviction_policy();
99 : let cf = self
100 : .eviction_iteration(&tenant, &policy, &self.cancel, &guard, &ctx)
101 : .await;
102 :
103 : match cf {
104 : ControlFlow::Break(()) => break,
105 : ControlFlow::Continue(sleep_until) => {
106 : if tokio::time::timeout_at(sleep_until, self.cancel.cancelled())
107 : .await
108 : .is_ok()
109 : {
110 : break;
111 : }
112 : }
113 : }
114 : }
115 : }
116 :
117 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
118 : async fn eviction_iteration(
119 : self: &Arc<Self>,
120 : tenant: &Tenant,
121 : policy: &EvictionPolicy,
122 : cancel: &CancellationToken,
123 : gate: &GateGuard,
124 : ctx: &RequestContext,
125 : ) -> ControlFlow<(), Instant> {
126 : debug!("eviction iteration: {policy:?}");
127 : let start = Instant::now();
128 : let (period, threshold) = match policy {
129 : EvictionPolicy::NoEviction => {
130 : // check again in 10 seconds; XXX config watch mechanism
131 : return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
132 : }
133 : EvictionPolicy::LayerAccessThreshold(p) => {
134 : match self
135 : .eviction_iteration_threshold(tenant, p, cancel, gate, ctx)
136 : .await
137 : {
138 : ControlFlow::Break(()) => return ControlFlow::Break(()),
139 : ControlFlow::Continue(()) => (),
140 : }
141 : (p.period, p.threshold)
142 : }
143 : EvictionPolicy::OnlyImitiate(p) => {
144 : if self
145 : .imitiate_only(tenant, p, cancel, gate, ctx)
146 : .await
147 : .is_break()
148 : {
149 : return ControlFlow::Break(());
150 : }
151 : (p.period, p.threshold)
152 : }
153 : };
154 :
155 : let elapsed = start.elapsed();
156 : crate::tenant::tasks::warn_when_period_overrun(
157 : elapsed,
158 : period,
159 : BackgroundLoopKind::Eviction,
160 : );
161 : // FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
162 : // don't think that is a relevant fear however, and regardless the imitation should be the
163 : // most costly part.
164 : crate::metrics::EVICTION_ITERATION_DURATION
165 : .get_metric_with_label_values(&[
166 : &format!("{}", period.as_secs()),
167 : &format!("{}", threshold.as_secs()),
168 : ])
169 : .unwrap()
170 : .observe(elapsed.as_secs_f64());
171 :
172 : ControlFlow::Continue(start + period)
173 : }
174 :
175 0 : async fn eviction_iteration_threshold(
176 0 : self: &Arc<Self>,
177 0 : tenant: &Tenant,
178 0 : p: &EvictionPolicyLayerAccessThreshold,
179 0 : cancel: &CancellationToken,
180 0 : gate: &GateGuard,
181 0 : ctx: &RequestContext,
182 0 : ) -> ControlFlow<()> {
183 0 : let now = SystemTime::now();
184 :
185 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
186 :
187 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
188 0 : .await?;
189 :
190 : #[derive(Debug, Default)]
191 : struct EvictionStats {
192 : candidates: usize,
193 : evicted: usize,
194 : errors: usize,
195 : not_evictable: usize,
196 : timeouts: usize,
197 : #[allow(dead_code)]
198 : skipped_for_shutdown: usize,
199 : }
200 :
201 0 : let mut stats = EvictionStats::default();
202 0 : // Gather layers for eviction.
203 0 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
204 0 : // We don't want to hold the layer map lock during eviction.
205 0 :
206 0 : // So, we just need to deal with this.
207 0 :
208 0 : let mut js = tokio::task::JoinSet::new();
209 : {
210 0 : let guard = self.layers.read().await;
211 :
212 0 : guard
213 0 : .likely_resident_layers()
214 0 : .filter(|layer| {
215 0 : let last_activity_ts = layer.latest_activity();
216 :
217 0 : let no_activity_for = match now.duration_since(last_activity_ts) {
218 0 : Ok(d) => d,
219 0 : Err(_e) => {
220 0 : // We reach here if `now` < `last_activity_ts`, which can legitimately
221 0 : // happen if there is an access between us getting `now`, and us getting
222 0 : // the access stats from the layer.
223 0 : //
224 0 : // The other reason why it can happen is system clock skew because
225 0 : // SystemTime::now() is not monotonic, so, even if there is no access
226 0 : // to the layer after we get `now` at the beginning of this function,
227 0 : // it could be that `now` < `last_activity_ts`.
228 0 : //
229 0 : // To distinguish the cases, we would need to record `Instant`s in the
230 0 : // access stats (i.e., monotonic timestamps), but then, the timestamps
231 0 : // values in the access stats would need to be `Instant`'s, and hence
232 0 : // they would be meaningless outside of the pageserver process.
233 0 : // At the time of writing, the trade-off is that access stats are more
234 0 : // valuable than detecting clock skew.
235 0 : return false;
236 : }
237 : };
238 :
239 0 : match layer.visibility() {
240 : LayerVisibilityHint::Visible => {
241 : // Usual case: a visible layer might be read any time, and we will keep it
242 : // resident until it hits our configured TTL threshold.
243 0 : no_activity_for > p.threshold
244 : }
245 : LayerVisibilityHint::Covered => {
246 : // Covered layers: this is probably a layer that was recently covered by
247 : // an image layer during compaction. We don't evict it immediately, but
248 : // it doesn't stay resident for the full `threshold`: we just keep it
249 : // for a shorter time in case
250 : // - it is used for Timestamp->LSN lookups
251 : // - a new branch is created in recent history which will read this layer
252 0 : no_activity_for > p.period
253 : }
254 : }
255 0 : })
256 0 : .cloned()
257 0 : .for_each(|layer| {
258 0 : js.spawn(async move {
259 0 : layer
260 0 : .evict_and_wait(std::time::Duration::from_secs(5))
261 0 : .await
262 0 : });
263 0 : stats.candidates += 1;
264 0 : });
265 0 : };
266 0 :
267 0 : let join_all = async move {
268 0 : while let Some(next) = js.join_next().await {
269 0 : match next {
270 0 : Ok(Ok(())) => stats.evicted += 1,
271 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
272 0 : stats.not_evictable += 1;
273 0 : }
274 0 : Ok(Err(EvictionError::Timeout)) => {
275 0 : stats.timeouts += 1;
276 0 : }
277 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
278 0 : Err(je) if je.is_panic() => {
279 0 : /* already logged */
280 0 : stats.errors += 1;
281 0 : }
282 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
283 : }
284 : }
285 0 : stats
286 0 : };
287 :
288 0 : tokio::select! {
289 0 : stats = join_all => {
290 0 : if stats.candidates == stats.not_evictable {
291 0 : debug!(stats=?stats, "eviction iteration complete");
292 0 : } else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
293 : // reminder: timeouts are not eviction cancellations
294 0 : warn!(stats=?stats, "eviction iteration complete");
295 : } else {
296 0 : info!(stats=?stats, "eviction iteration complete");
297 : }
298 : }
299 0 : _ = cancel.cancelled() => {
300 0 : // just drop the joinset to "abort"
301 0 : }
302 : }
303 :
304 0 : ControlFlow::Continue(())
305 0 : }
306 :
307 : /// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
308 : /// disk usage based eviction task.
309 0 : async fn imitiate_only(
310 0 : self: &Arc<Self>,
311 0 : tenant: &Tenant,
312 0 : p: &EvictionPolicyLayerAccessThreshold,
313 0 : cancel: &CancellationToken,
314 0 : gate: &GateGuard,
315 0 : ctx: &RequestContext,
316 0 : ) -> ControlFlow<()> {
317 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
318 :
319 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
320 0 : .await
321 0 : }
322 :
323 0 : async fn acquire_imitation_permit(
324 0 : &self,
325 0 : cancel: &CancellationToken,
326 0 : ctx: &RequestContext,
327 0 : ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
328 0 : let acquire_permit =
329 0 : crate::tenant::tasks::acquire_concurrency_permit(BackgroundLoopKind::Eviction, ctx);
330 0 :
331 0 : tokio::select! {
332 0 : permit = acquire_permit => ControlFlow::Continue(permit),
333 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
334 0 : _ = self.cancel.cancelled() => ControlFlow::Break(()),
335 : }
336 0 : }
337 :
338 : /// If we evict layers but keep cached values derived from those layers, then
339 : /// we face a storm of on-demand downloads after pageserver restart.
340 : /// The reason is that the restart empties the caches, and so, the values
341 : /// need to be re-computed by accessing layers, which we evicted while the
342 : /// caches were filled.
343 : ///
344 : /// Solutions here would be one of the following:
345 : /// 1. Have a persistent cache.
346 : /// 2. Count every access to a cached value to the access stats of all layers
347 : /// that were accessed to compute the value in the first place.
348 : /// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
349 : /// get re-computed from layers, thereby counting towards layer access stats.
350 : /// 4. Make the eviction task imitate the layer accesses that typically hit caches.
351 : ///
352 : /// We follow approach (4) here because in Neon prod deployment:
353 : /// - page cache is quite small => high churn => low hit rate
354 : /// => eviction gets correct access stats
355 : /// - value-level caches such as logical size & repatition have a high hit rate,
356 : /// especially for inactive tenants
357 : /// => eviction sees zero accesses for these
358 : /// => they cause the on-demand download storm on pageserver restart
359 : ///
360 : /// We should probably move to persistent caches in the future, or avoid
361 : /// having inactive tenants attached to pageserver in the first place.
362 : #[instrument(skip_all)]
363 : async fn imitate_layer_accesses(
364 : &self,
365 : tenant: &Tenant,
366 : p: &EvictionPolicyLayerAccessThreshold,
367 : cancel: &CancellationToken,
368 : gate: &GateGuard,
369 : permit: BackgroundLoopSemaphorePermit<'static>,
370 : ctx: &RequestContext,
371 : ) -> ControlFlow<()> {
372 : if !self.tenant_shard_id.is_shard_zero() {
373 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
374 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
375 : // skip imitating logical size accesses for eviction purposes.
376 : return ControlFlow::Continue(());
377 : }
378 :
379 : let mut state = self.eviction_task_timeline_state.lock().await;
380 :
381 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
382 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
383 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
384 :
385 : match state.last_layer_access_imitation {
386 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
387 : _ => {
388 : self.imitate_timeline_cached_layer_accesses(gate, ctx).await;
389 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
390 : }
391 : }
392 : drop(state);
393 :
394 : if cancel.is_cancelled() {
395 : return ControlFlow::Break(());
396 : }
397 :
398 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
399 : // Make one of the tenant's timelines draw the short straw and run the calculation.
400 : // The others wait until the calculation is done so that they take into account the
401 : // imitated accesses that the winner made.
402 : let (mut state, _permit) = {
403 : if let Ok(locked) = tenant.eviction_task_tenant_state.try_lock() {
404 : (locked, permit)
405 : } else {
406 : // we might need to wait for a long time here in case of pathological synthetic
407 : // size calculation performance
408 : drop(permit);
409 : let locked = tokio::select! {
410 : locked = tenant.eviction_task_tenant_state.lock() => locked,
411 : _ = self.cancel.cancelled() => {
412 : return ControlFlow::Break(())
413 : },
414 : _ = cancel.cancelled() => {
415 : return ControlFlow::Break(())
416 : }
417 : };
418 : // then reacquire -- this will be bad if there is a lot of traffic, but because we
419 : // released the permit, the overall latency will be much better.
420 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
421 : (locked, permit)
422 : }
423 : };
424 : match state.last_layer_access_imitation {
425 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
426 : _ => {
427 : self.imitate_synthetic_size_calculation_worker(tenant, cancel, ctx)
428 : .await;
429 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
430 : }
431 : }
432 : drop(state);
433 :
434 : if cancel.is_cancelled() {
435 : return ControlFlow::Break(());
436 : }
437 :
438 : ControlFlow::Continue(())
439 : }
440 :
441 : /// Recompute the values which would cause on-demand downloads during restart.
442 : #[instrument(skip_all)]
443 : async fn imitate_timeline_cached_layer_accesses(
444 : &self,
445 : guard: &GateGuard,
446 : ctx: &RequestContext,
447 : ) {
448 : let lsn = self.get_last_record_lsn();
449 :
450 : // imitiate on-restart initial logical size
451 : let size = self
452 : .calculate_logical_size(
453 : lsn,
454 : LogicalSizeCalculationCause::EvictionTaskImitation,
455 : guard,
456 : ctx,
457 : )
458 : .instrument(info_span!("calculate_logical_size"))
459 : .await;
460 :
461 : match &size {
462 : Ok(_size) => {
463 : // good, don't log it to avoid confusion
464 : }
465 : Err(_) => {
466 : // we have known issues for which we already log this on consumption metrics,
467 : // gc, and compaction. leave logging out for now.
468 : //
469 : // https://github.com/neondatabase/neon/issues/2539
470 : }
471 : }
472 :
473 : // imitiate repartiting on first compactation
474 : if let Err(e) = self
475 : .collect_keyspace(lsn, ctx)
476 : .instrument(info_span!("collect_keyspace"))
477 : .await
478 : {
479 : // if this failed, we probably failed logical size because these use the same keys
480 : if size.is_err() {
481 : // ignore, see above comment
482 : } else {
483 : match e {
484 : CollectKeySpaceError::Cancelled => {
485 : // Shutting down, ignore
486 : }
487 : err => {
488 : warn!(
489 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
490 : );
491 : }
492 : }
493 : }
494 : }
495 : }
496 :
497 : // Imitate the synthetic size calculation done by the consumption_metrics module.
498 : #[instrument(skip_all)]
499 : async fn imitate_synthetic_size_calculation_worker(
500 : &self,
501 : tenant: &Tenant,
502 : cancel: &CancellationToken,
503 : ctx: &RequestContext,
504 : ) {
505 : if self.conf.metric_collection_endpoint.is_none() {
506 : // We don't start the consumption metrics task if this is not set in the config.
507 : // So, no need to imitate the accesses in that case.
508 : return;
509 : }
510 :
511 : // The consumption metrics are collected on a per-tenant basis, by a single
512 : // global background loop.
513 : // It limits the number of synthetic size calculations using the global
514 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
515 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
516 : //
517 : // If we used that same semaphore here, then we'd compete for the
518 : // same permits, which may impact timeliness of consumption metrics.
519 : // That is a no-go, as consumption metrics are much more important
520 : // than what we do here.
521 : //
522 : // So, we have a separate semaphore, initialized to the same
523 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
524 : // In the worst, we would have twice the amount of concurrenct size calculations.
525 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
526 : // we spread out the eviction task using `random_init_delay`.
527 : // So, the chance of the worst case is quite low in practice.
528 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
529 : // So, we must coordinate with other with other eviction tasks of this tenant.
530 : let limit = self
531 : .conf
532 : .eviction_task_immitated_concurrent_logical_size_queries
533 : .inner();
534 :
535 : let mut throwaway_cache = HashMap::new();
536 : let gather = crate::tenant::size::gather_inputs(
537 : tenant,
538 : limit,
539 : None,
540 : &mut throwaway_cache,
541 : LogicalSizeCalculationCause::EvictionTaskImitation,
542 : cancel,
543 : ctx,
544 : )
545 : .instrument(info_span!("gather_inputs"));
546 :
547 : tokio::select! {
548 : _ = cancel.cancelled() => {}
549 : gather_result = gather => {
550 : match gather_result {
551 : Ok(_) => {},
552 : // It can happen sometimes that we hit this instead of the cancellation token firing above
553 : Err(CalculateSyntheticSizeError::Cancelled) => {}
554 : Err(e) => {
555 : // We don't care about the result, but, if it failed, we should log it,
556 : // since consumption metric might be hitting the cached value and
557 : // thus not encountering this error.
558 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
559 : }
560 : }
561 : }
562 : }
563 : }
564 : }
|