Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::collections::HashMap;
17 : use std::ops::ControlFlow;
18 : use std::sync::Arc;
19 : use std::time::{Duration, SystemTime};
20 :
21 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
22 : use tokio::time::Instant;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{Instrument, debug, info, info_span, instrument, warn};
25 : use utils::completion;
26 : use utils::sync::gate::GateGuard;
27 :
28 : use super::Timeline;
29 : use crate::context::{DownloadBehavior, RequestContext};
30 : use crate::pgdatadir_mapping::CollectKeySpaceError;
31 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
32 : use crate::tenant::size::CalculateSyntheticSizeError;
33 : use crate::tenant::storage_layer::LayerVisibilityHint;
34 : use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
35 : use crate::tenant::timeline::EvictionError;
36 : use crate::tenant::{LogicalSizeCalculationCause, Tenant};
37 :
38 : #[derive(Default)]
39 : pub struct EvictionTaskTimelineState {
40 : last_layer_access_imitation: Option<tokio::time::Instant>,
41 : }
42 :
43 : #[derive(Default)]
44 : pub struct EvictionTaskTenantState {
45 : last_layer_access_imitation: Option<Instant>,
46 : }
47 :
48 : impl Timeline {
49 0 : pub(super) fn launch_eviction_task(
50 0 : self: &Arc<Self>,
51 0 : parent: Arc<Tenant>,
52 0 : background_tasks_can_start: Option<&completion::Barrier>,
53 0 : ) {
54 0 : let self_clone = Arc::clone(self);
55 0 : let background_tasks_can_start = background_tasks_can_start.cloned();
56 0 : task_mgr::spawn(
57 0 : BACKGROUND_RUNTIME.handle(),
58 0 : TaskKind::Eviction,
59 0 : self.tenant_shard_id,
60 0 : Some(self.timeline_id),
61 0 : &format!(
62 0 : "layer eviction for {}/{}",
63 0 : self.tenant_shard_id, self.timeline_id
64 0 : ),
65 0 : async move {
66 0 : tokio::select! {
67 0 : _ = self_clone.cancel.cancelled() => { return Ok(()); }
68 0 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
69 0 : };
70 0 :
71 0 : self_clone.eviction_task(parent).await;
72 0 : Ok(())
73 0 : },
74 0 : );
75 0 : }
76 :
77 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
78 : async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
79 : // acquire the gate guard only once within a useful span
80 : let Ok(guard) = self.gate.enter() else {
81 : return;
82 : };
83 :
84 : {
85 : let policy = self.get_eviction_policy();
86 : let period = match policy {
87 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
88 : EvictionPolicy::OnlyImitiate(lat) => lat.period,
89 : EvictionPolicy::NoEviction => Duration::from_secs(10),
90 : };
91 : if sleep_random(period, &self.cancel).await.is_err() {
92 : return;
93 : }
94 : }
95 :
96 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn)
97 : .with_scope_timeline(&self);
98 : loop {
99 : let policy = self.get_eviction_policy();
100 : let cf = self
101 : .eviction_iteration(&tenant, &policy, &self.cancel, &guard, &ctx)
102 : .await;
103 :
104 : match cf {
105 : ControlFlow::Break(()) => break,
106 : ControlFlow::Continue(sleep_until) => {
107 : if tokio::time::timeout_at(sleep_until, self.cancel.cancelled())
108 : .await
109 : .is_ok()
110 : {
111 : break;
112 : }
113 : }
114 : }
115 : }
116 : }
117 :
118 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
119 : async fn eviction_iteration(
120 : self: &Arc<Self>,
121 : tenant: &Tenant,
122 : policy: &EvictionPolicy,
123 : cancel: &CancellationToken,
124 : gate: &GateGuard,
125 : ctx: &RequestContext,
126 : ) -> ControlFlow<(), Instant> {
127 : debug!("eviction iteration: {policy:?}");
128 : let start = Instant::now();
129 : let (period, threshold) = match policy {
130 : EvictionPolicy::NoEviction => {
131 : // check again in 10 seconds; XXX config watch mechanism
132 : return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
133 : }
134 : EvictionPolicy::LayerAccessThreshold(p) => {
135 : match self
136 : .eviction_iteration_threshold(tenant, p, cancel, gate, ctx)
137 : .await
138 : {
139 : ControlFlow::Break(()) => return ControlFlow::Break(()),
140 : ControlFlow::Continue(()) => (),
141 : }
142 : (p.period, p.threshold)
143 : }
144 : EvictionPolicy::OnlyImitiate(p) => {
145 : if self
146 : .imitiate_only(tenant, p, cancel, gate, ctx)
147 : .await
148 : .is_break()
149 : {
150 : return ControlFlow::Break(());
151 : }
152 : (p.period, p.threshold)
153 : }
154 : };
155 :
156 : let elapsed = start.elapsed();
157 : crate::tenant::tasks::warn_when_period_overrun(
158 : elapsed,
159 : period,
160 : BackgroundLoopKind::Eviction,
161 : );
162 : // FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
163 : // don't think that is a relevant fear however, and regardless the imitation should be the
164 : // most costly part.
165 : crate::metrics::EVICTION_ITERATION_DURATION
166 : .get_metric_with_label_values(&[
167 : &format!("{}", period.as_secs()),
168 : &format!("{}", threshold.as_secs()),
169 : ])
170 : .unwrap()
171 : .observe(elapsed.as_secs_f64());
172 :
173 : ControlFlow::Continue(start + period)
174 : }
175 :
176 0 : async fn eviction_iteration_threshold(
177 0 : self: &Arc<Self>,
178 0 : tenant: &Tenant,
179 0 : p: &EvictionPolicyLayerAccessThreshold,
180 0 : cancel: &CancellationToken,
181 0 : gate: &GateGuard,
182 0 : ctx: &RequestContext,
183 0 : ) -> ControlFlow<()> {
184 0 : let now = SystemTime::now();
185 :
186 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
187 :
188 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
189 0 : .await?;
190 :
191 : #[derive(Debug, Default)]
192 : struct EvictionStats {
193 : candidates: usize,
194 : evicted: usize,
195 : errors: usize,
196 : not_evictable: usize,
197 : timeouts: usize,
198 : #[allow(dead_code)]
199 : skipped_for_shutdown: usize,
200 : }
201 :
202 0 : let mut stats = EvictionStats::default();
203 0 : // Gather layers for eviction.
204 0 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
205 0 : // We don't want to hold the layer map lock during eviction.
206 0 :
207 0 : // So, we just need to deal with this.
208 0 :
209 0 : let mut js = tokio::task::JoinSet::new();
210 : {
211 0 : let guard = self.layers.read().await;
212 :
213 0 : guard
214 0 : .likely_resident_layers()
215 0 : .filter(|layer| {
216 0 : let last_activity_ts = layer.latest_activity();
217 :
218 0 : let no_activity_for = match now.duration_since(last_activity_ts) {
219 0 : Ok(d) => d,
220 0 : Err(_e) => {
221 0 : // We reach here if `now` < `last_activity_ts`, which can legitimately
222 0 : // happen if there is an access between us getting `now`, and us getting
223 0 : // the access stats from the layer.
224 0 : //
225 0 : // The other reason why it can happen is system clock skew because
226 0 : // SystemTime::now() is not monotonic, so, even if there is no access
227 0 : // to the layer after we get `now` at the beginning of this function,
228 0 : // it could be that `now` < `last_activity_ts`.
229 0 : //
230 0 : // To distinguish the cases, we would need to record `Instant`s in the
231 0 : // access stats (i.e., monotonic timestamps), but then, the timestamps
232 0 : // values in the access stats would need to be `Instant`'s, and hence
233 0 : // they would be meaningless outside of the pageserver process.
234 0 : // At the time of writing, the trade-off is that access stats are more
235 0 : // valuable than detecting clock skew.
236 0 : return false;
237 : }
238 : };
239 :
240 0 : match layer.visibility() {
241 : LayerVisibilityHint::Visible => {
242 : // Usual case: a visible layer might be read any time, and we will keep it
243 : // resident until it hits our configured TTL threshold.
244 0 : no_activity_for > p.threshold
245 : }
246 : LayerVisibilityHint::Covered => {
247 : // Covered layers: this is probably a layer that was recently covered by
248 : // an image layer during compaction. We don't evict it immediately, but
249 : // it doesn't stay resident for the full `threshold`: we just keep it
250 : // for a shorter time in case
251 : // - it is used for Timestamp->LSN lookups
252 : // - a new branch is created in recent history which will read this layer
253 0 : no_activity_for > p.period
254 : }
255 : }
256 0 : })
257 0 : .cloned()
258 0 : .for_each(|layer| {
259 0 : js.spawn(async move {
260 0 : layer
261 0 : .evict_and_wait(std::time::Duration::from_secs(5))
262 0 : .await
263 0 : });
264 0 : stats.candidates += 1;
265 0 : });
266 0 : };
267 0 :
268 0 : let join_all = async move {
269 0 : while let Some(next) = js.join_next().await {
270 0 : match next {
271 0 : Ok(Ok(())) => stats.evicted += 1,
272 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
273 0 : stats.not_evictable += 1;
274 0 : }
275 0 : Ok(Err(EvictionError::Timeout)) => {
276 0 : stats.timeouts += 1;
277 0 : }
278 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
279 0 : Err(je) if je.is_panic() => {
280 0 : /* already logged */
281 0 : stats.errors += 1;
282 0 : }
283 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
284 : }
285 : }
286 0 : stats
287 0 : };
288 :
289 0 : tokio::select! {
290 0 : stats = join_all => {
291 0 : if stats.candidates == stats.not_evictable {
292 0 : debug!(stats=?stats, "eviction iteration complete");
293 0 : } else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 {
294 : // reminder: timeouts are not eviction cancellations
295 0 : warn!(stats=?stats, "eviction iteration complete");
296 : } else {
297 0 : info!(stats=?stats, "eviction iteration complete");
298 : }
299 : }
300 0 : _ = cancel.cancelled() => {
301 0 : // just drop the joinset to "abort"
302 0 : }
303 : }
304 :
305 0 : ControlFlow::Continue(())
306 0 : }
307 :
308 : /// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
309 : /// disk usage based eviction task.
310 0 : async fn imitiate_only(
311 0 : self: &Arc<Self>,
312 0 : tenant: &Tenant,
313 0 : p: &EvictionPolicyLayerAccessThreshold,
314 0 : cancel: &CancellationToken,
315 0 : gate: &GateGuard,
316 0 : ctx: &RequestContext,
317 0 : ) -> ControlFlow<()> {
318 0 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
319 :
320 0 : self.imitate_layer_accesses(tenant, p, cancel, gate, permit, ctx)
321 0 : .await
322 0 : }
323 :
324 0 : async fn acquire_imitation_permit(
325 0 : &self,
326 0 : cancel: &CancellationToken,
327 0 : ctx: &RequestContext,
328 0 : ) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
329 0 : let acquire_permit =
330 0 : crate::tenant::tasks::acquire_concurrency_permit(BackgroundLoopKind::Eviction, ctx);
331 0 :
332 0 : tokio::select! {
333 0 : permit = acquire_permit => ControlFlow::Continue(permit),
334 0 : _ = cancel.cancelled() => ControlFlow::Break(()),
335 0 : _ = self.cancel.cancelled() => ControlFlow::Break(()),
336 : }
337 0 : }
338 :
339 : /// If we evict layers but keep cached values derived from those layers, then
340 : /// we face a storm of on-demand downloads after pageserver restart.
341 : /// The reason is that the restart empties the caches, and so, the values
342 : /// need to be re-computed by accessing layers, which we evicted while the
343 : /// caches were filled.
344 : ///
345 : /// Solutions here would be one of the following:
346 : /// 1. Have a persistent cache.
347 : /// 2. Count every access to a cached value to the access stats of all layers
348 : /// that were accessed to compute the value in the first place.
349 : /// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
350 : /// get re-computed from layers, thereby counting towards layer access stats.
351 : /// 4. Make the eviction task imitate the layer accesses that typically hit caches.
352 : ///
353 : /// We follow approach (4) here because in Neon prod deployment:
354 : /// - page cache is quite small => high churn => low hit rate
355 : /// => eviction gets correct access stats
356 : /// - value-level caches such as logical size & repatition have a high hit rate,
357 : /// especially for inactive tenants
358 : /// => eviction sees zero accesses for these
359 : /// => they cause the on-demand download storm on pageserver restart
360 : ///
361 : /// We should probably move to persistent caches in the future, or avoid
362 : /// having inactive tenants attached to pageserver in the first place.
363 : #[instrument(skip_all)]
364 : async fn imitate_layer_accesses(
365 : &self,
366 : tenant: &Tenant,
367 : p: &EvictionPolicyLayerAccessThreshold,
368 : cancel: &CancellationToken,
369 : gate: &GateGuard,
370 : permit: BackgroundLoopSemaphorePermit<'static>,
371 : ctx: &RequestContext,
372 : ) -> ControlFlow<()> {
373 : if !self.tenant_shard_id.is_shard_zero() {
374 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
375 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
376 : // skip imitating logical size accesses for eviction purposes.
377 : return ControlFlow::Continue(());
378 : }
379 :
380 : let mut state = self.eviction_task_timeline_state.lock().await;
381 :
382 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
383 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
384 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
385 :
386 : match state.last_layer_access_imitation {
387 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
388 : _ => {
389 : self.imitate_timeline_cached_layer_accesses(gate, ctx).await;
390 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
391 : }
392 : }
393 : drop(state);
394 :
395 : if cancel.is_cancelled() {
396 : return ControlFlow::Break(());
397 : }
398 :
399 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
400 : // Make one of the tenant's timelines draw the short straw and run the calculation.
401 : // The others wait until the calculation is done so that they take into account the
402 : // imitated accesses that the winner made.
403 : let (mut state, _permit) = {
404 : if let Ok(locked) = tenant.eviction_task_tenant_state.try_lock() {
405 : (locked, permit)
406 : } else {
407 : // we might need to wait for a long time here in case of pathological synthetic
408 : // size calculation performance
409 : drop(permit);
410 : let locked = tokio::select! {
411 : locked = tenant.eviction_task_tenant_state.lock() => locked,
412 : _ = self.cancel.cancelled() => {
413 : return ControlFlow::Break(())
414 : },
415 : _ = cancel.cancelled() => {
416 : return ControlFlow::Break(())
417 : }
418 : };
419 : // then reacquire -- this will be bad if there is a lot of traffic, but because we
420 : // released the permit, the overall latency will be much better.
421 : let permit = self.acquire_imitation_permit(cancel, ctx).await?;
422 : (locked, permit)
423 : }
424 : };
425 : match state.last_layer_access_imitation {
426 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
427 : _ => {
428 : self.imitate_synthetic_size_calculation_worker(tenant, cancel, ctx)
429 : .await;
430 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
431 : }
432 : }
433 : drop(state);
434 :
435 : if cancel.is_cancelled() {
436 : return ControlFlow::Break(());
437 : }
438 :
439 : ControlFlow::Continue(())
440 : }
441 :
442 : /// Recompute the values which would cause on-demand downloads during restart.
443 : #[instrument(skip_all)]
444 : async fn imitate_timeline_cached_layer_accesses(
445 : &self,
446 : guard: &GateGuard,
447 : ctx: &RequestContext,
448 : ) {
449 : let lsn = self.get_last_record_lsn();
450 :
451 : // imitiate on-restart initial logical size
452 : let size = self
453 : .calculate_logical_size(
454 : lsn,
455 : LogicalSizeCalculationCause::EvictionTaskImitation,
456 : guard,
457 : ctx,
458 : )
459 : .instrument(info_span!("calculate_logical_size"))
460 : .await;
461 :
462 : match &size {
463 : Ok(_size) => {
464 : // good, don't log it to avoid confusion
465 : }
466 : Err(_) => {
467 : // we have known issues for which we already log this on consumption metrics,
468 : // gc, and compaction. leave logging out for now.
469 : //
470 : // https://github.com/neondatabase/neon/issues/2539
471 : }
472 : }
473 :
474 : // imitiate repartiting on first compactation
475 : if let Err(e) = self
476 : .collect_keyspace(lsn, ctx)
477 : .instrument(info_span!("collect_keyspace"))
478 : .await
479 : {
480 : // if this failed, we probably failed logical size because these use the same keys
481 : if size.is_err() {
482 : // ignore, see above comment
483 : } else {
484 : match e {
485 : CollectKeySpaceError::Cancelled => {
486 : // Shutting down, ignore
487 : }
488 : err => {
489 : warn!(
490 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
491 : );
492 : }
493 : }
494 : }
495 : }
496 : }
497 :
498 : // Imitate the synthetic size calculation done by the consumption_metrics module.
499 : #[instrument(skip_all)]
500 : async fn imitate_synthetic_size_calculation_worker(
501 : &self,
502 : tenant: &Tenant,
503 : cancel: &CancellationToken,
504 : ctx: &RequestContext,
505 : ) {
506 : if self.conf.metric_collection_endpoint.is_none() {
507 : // We don't start the consumption metrics task if this is not set in the config.
508 : // So, no need to imitate the accesses in that case.
509 : return;
510 : }
511 :
512 : // The consumption metrics are collected on a per-tenant basis, by a single
513 : // global background loop.
514 : // It limits the number of synthetic size calculations using the global
515 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
516 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
517 : //
518 : // If we used that same semaphore here, then we'd compete for the
519 : // same permits, which may impact timeliness of consumption metrics.
520 : // That is a no-go, as consumption metrics are much more important
521 : // than what we do here.
522 : //
523 : // So, we have a separate semaphore, initialized to the same
524 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
525 : // In the worst, we would have twice the amount of concurrenct size calculations.
526 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
527 : // we spread out the eviction task using `random_init_delay`.
528 : // So, the chance of the worst case is quite low in practice.
529 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
530 : // So, we must coordinate with other with other eviction tasks of this tenant.
531 : let limit = self
532 : .conf
533 : .eviction_task_immitated_concurrent_logical_size_queries
534 : .inner();
535 :
536 : let mut throwaway_cache = HashMap::new();
537 : let gather = crate::tenant::size::gather_inputs(
538 : tenant,
539 : limit,
540 : None,
541 : &mut throwaway_cache,
542 : LogicalSizeCalculationCause::EvictionTaskImitation,
543 : cancel,
544 : ctx,
545 : )
546 : .instrument(info_span!("gather_inputs"));
547 :
548 : tokio::select! {
549 : _ = cancel.cancelled() => {}
550 : gather_result = gather => {
551 : match gather_result {
552 : Ok(_) => {},
553 : // It can happen sometimes that we hit this instead of the cancellation token firing above
554 : Err(CalculateSyntheticSizeError::Cancelled) => {}
555 : Err(e) => {
556 : // We don't care about the result, but, if it failed, we should log it,
557 : // since consumption metric might be hitting the cached value and
558 : // thus not encountering this error.
559 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
560 : }
561 : }
562 : }
563 : }
564 : }
565 : }
|