Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
24 : use tokio::time::Instant;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
27 :
28 : use crate::{
29 : context::{DownloadBehavior, RequestContext},
30 : pgdatadir_mapping::CollectKeySpaceError,
31 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
32 : tenant::{
33 : tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
34 : },
35 : };
36 :
37 : use utils::completion;
38 :
39 : use super::Timeline;
40 :
41 292 : #[derive(Default)]
42 : pub struct EvictionTaskTimelineState {
43 : last_layer_access_imitation: Option<tokio::time::Instant>,
44 : }
45 :
46 84 : #[derive(Default)]
47 : pub struct EvictionTaskTenantState {
48 : last_layer_access_imitation: Option<Instant>,
49 : }
50 :
51 : impl Timeline {
52 0 : pub(super) fn launch_eviction_task(
53 0 : self: &Arc<Self>,
54 0 : background_tasks_can_start: Option<&completion::Barrier>,
55 0 : ) {
56 0 : let self_clone = Arc::clone(self);
57 0 : let background_tasks_can_start = background_tasks_can_start.cloned();
58 0 : task_mgr::spawn(
59 0 : BACKGROUND_RUNTIME.handle(),
60 0 : TaskKind::Eviction,
61 0 : Some(self.tenant_shard_id),
62 0 : Some(self.timeline_id),
63 0 : &format!(
64 0 : "layer eviction for {}/{}",
65 0 : self.tenant_shard_id, self.timeline_id
66 0 : ),
67 0 : false,
68 0 : async move {
69 0 : let cancel = task_mgr::shutdown_token();
70 0 : tokio::select! {
71 0 : _ = cancel.cancelled() => { return Ok(()); }
72 0 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
73 0 : };
74 :
75 0 : self_clone.eviction_task(cancel).await;
76 0 : Ok(())
77 0 : },
78 0 : );
79 0 : }
80 :
81 0 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
82 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
83 : use crate::tenant::tasks::random_init_delay;
84 : {
85 : let policy = self.get_eviction_policy();
86 : let period = match policy {
87 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
88 : EvictionPolicy::OnlyImitiate(lat) => lat.period,
89 : EvictionPolicy::NoEviction => Duration::from_secs(10),
90 : };
91 : if random_init_delay(period, &cancel).await.is_err() {
92 : return;
93 : }
94 : }
95 :
96 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
97 : loop {
98 : let policy = self.get_eviction_policy();
99 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
100 :
101 : match cf {
102 : ControlFlow::Break(()) => break,
103 : ControlFlow::Continue(sleep_until) => {
104 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
105 : .await
106 : .is_ok()
107 : {
108 : break;
109 : }
110 : }
111 : }
112 : }
113 : }
114 :
115 0 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
116 : async fn eviction_iteration(
117 : self: &Arc<Self>,
118 : policy: &EvictionPolicy,
119 : cancel: &CancellationToken,
120 : ctx: &RequestContext,
121 : ) -> ControlFlow<(), Instant> {
122 0 : debug!("eviction iteration: {policy:?}");
123 : let start = Instant::now();
124 : let (period, threshold) = match policy {
125 : EvictionPolicy::NoEviction => {
126 : // check again in 10 seconds; XXX config watch mechanism
127 : return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
128 : }
129 : EvictionPolicy::LayerAccessThreshold(p) => {
130 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
131 : ControlFlow::Break(()) => return ControlFlow::Break(()),
132 : ControlFlow::Continue(()) => (),
133 : }
134 : (p.period, p.threshold)
135 : }
136 : EvictionPolicy::OnlyImitiate(p) => {
137 : if self.imitiate_only(p, cancel, ctx).await.is_break() {
138 : return ControlFlow::Break(());
139 : }
140 : (p.period, p.threshold)
141 : }
142 : };
143 :
144 : let elapsed = start.elapsed();
145 : crate::tenant::tasks::warn_when_period_overrun(
146 : elapsed,
147 : period,
148 : BackgroundLoopKind::Eviction,
149 : );
150 : // FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
151 : // don't think that is a relevant fear however, and regardless the imitation should be the
152 : // most costly part.
153 : crate::metrics::EVICTION_ITERATION_DURATION
154 : .get_metric_with_label_values(&[
155 : &format!("{}", period.as_secs()),
156 : &format!("{}", threshold.as_secs()),
157 : ])
158 : .unwrap()
159 : .observe(elapsed.as_secs_f64());
160 :
161 : ControlFlow::Continue(start + period)
162 : }
163 :
164 0 : async fn eviction_iteration_threshold(
165 0 : self: &Arc<Self>,
166 0 : p: &EvictionPolicyLayerAccessThreshold,
167 0 : cancel: &CancellationToken,
168 0 : ctx: &RequestContext,
169 0 : ) -> ControlFlow<()> {
170 0 : let now = SystemTime::now();
171 0 :
172 0 : let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
173 0 : BackgroundLoopKind::Eviction,
174 0 : ctx,
175 0 : );
176 :
177 0 : let _permit = tokio::select! {
178 0 : permit = acquire_permit => permit,
179 : _ = cancel.cancelled() => return ControlFlow::Break(()),
180 : _ = self.cancel.cancelled() => return ControlFlow::Break(()),
181 : };
182 :
183 0 : match self.imitate_layer_accesses(p, cancel, ctx).await {
184 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
185 0 : ControlFlow::Continue(()) => (),
186 0 : }
187 0 :
188 0 : #[derive(Debug, Default)]
189 0 : struct EvictionStats {
190 0 : candidates: usize,
191 0 : evicted: usize,
192 0 : errors: usize,
193 0 : not_evictable: usize,
194 0 : #[allow(dead_code)]
195 0 : skipped_for_shutdown: usize,
196 0 : }
197 0 :
198 0 : let mut stats = EvictionStats::default();
199 0 : // Gather layers for eviction.
200 0 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
201 0 : // We don't want to hold the layer map lock during eviction.
202 0 :
203 0 : // So, we just need to deal with this.
204 0 :
205 0 : if self.remote_client.is_none() {
206 0 : error!("no remote storage configured, cannot evict layers");
207 0 : return ControlFlow::Continue(());
208 0 : }
209 0 :
210 0 : let mut js = tokio::task::JoinSet::new();
211 : {
212 0 : let guard = self.layers.read().await;
213 0 : let layers = guard.layer_map();
214 0 : for hist_layer in layers.iter_historic_layers() {
215 0 : let hist_layer = guard.get_from_desc(&hist_layer);
216 :
217 : // guard against eviction while we inspect it; it might be that eviction_task and
218 : // disk_usage_eviction_task both select the same layers to be evicted, and
219 : // seemingly free up double the space. both succeeding is of no consequence.
220 0 : let guard = match hist_layer.keep_resident().await {
221 0 : Ok(Some(l)) => l,
222 0 : Ok(None) => continue,
223 0 : Err(e) => {
224 : // these should not happen, but we cannot make them statically impossible right
225 : // now.
226 0 : tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
227 0 : continue;
228 : }
229 : };
230 :
231 0 : let last_activity_ts = hist_layer.access_stats().latest_activity_or_now();
232 :
233 0 : let no_activity_for = match now.duration_since(last_activity_ts) {
234 0 : Ok(d) => d,
235 0 : Err(_e) => {
236 0 : // We reach here if `now` < `last_activity_ts`, which can legitimately
237 0 : // happen if there is an access between us getting `now`, and us getting
238 0 : // the access stats from the layer.
239 0 : //
240 0 : // The other reason why it can happen is system clock skew because
241 0 : // SystemTime::now() is not monotonic, so, even if there is no access
242 0 : // to the layer after we get `now` at the beginning of this function,
243 0 : // it could be that `now` < `last_activity_ts`.
244 0 : //
245 0 : // To distinguish the cases, we would need to record `Instant`s in the
246 0 : // access stats (i.e., monotonic timestamps), but then, the timestamps
247 0 : // values in the access stats would need to be `Instant`'s, and hence
248 0 : // they would be meaningless outside of the pageserver process.
249 0 : // At the time of writing, the trade-off is that access stats are more
250 0 : // valuable than detecting clock skew.
251 0 : continue;
252 : }
253 : };
254 0 : let layer = guard.drop_eviction_guard();
255 0 : if no_activity_for > p.threshold {
256 0 : // this could cause a lot of allocations in some cases
257 0 : js.spawn(async move { layer.evict_and_wait().await });
258 0 : stats.candidates += 1;
259 0 : }
260 : }
261 : };
262 :
263 0 : let join_all = async move {
264 0 : while let Some(next) = js.join_next().await {
265 0 : match next {
266 0 : Ok(Ok(())) => stats.evicted += 1,
267 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
268 0 : stats.not_evictable += 1;
269 0 : }
270 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
271 0 : Err(je) if je.is_panic() => {
272 0 : /* already logged */
273 0 : stats.errors += 1;
274 0 : }
275 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
276 : }
277 : }
278 0 : stats
279 0 : };
280 :
281 0 : tokio::select! {
282 0 : stats = join_all => {
283 : if stats.candidates == stats.not_evictable {
284 0 : debug!(stats=?stats, "eviction iteration complete");
285 : } else if stats.errors > 0 || stats.not_evictable > 0 {
286 0 : warn!(stats=?stats, "eviction iteration complete");
287 : } else {
288 0 : info!(stats=?stats, "eviction iteration complete");
289 : }
290 : }
291 : _ = cancel.cancelled() => {
292 : // just drop the joinset to "abort"
293 : }
294 : }
295 :
296 0 : ControlFlow::Continue(())
297 0 : }
298 :
299 : /// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
300 : /// disk usage based eviction task.
301 0 : async fn imitiate_only(
302 0 : self: &Arc<Self>,
303 0 : p: &EvictionPolicyLayerAccessThreshold,
304 0 : cancel: &CancellationToken,
305 0 : ctx: &RequestContext,
306 0 : ) -> ControlFlow<()> {
307 0 : let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
308 0 : BackgroundLoopKind::Eviction,
309 0 : ctx,
310 0 : );
311 :
312 0 : let _permit = tokio::select! {
313 0 : permit = acquire_permit => permit,
314 : _ = cancel.cancelled() => return ControlFlow::Break(()),
315 : _ = self.cancel.cancelled() => return ControlFlow::Break(()),
316 : };
317 :
318 0 : self.imitate_layer_accesses(p, cancel, ctx).await
319 0 : }
320 :
321 : /// If we evict layers but keep cached values derived from those layers, then
322 : /// we face a storm of on-demand downloads after pageserver restart.
323 : /// The reason is that the restart empties the caches, and so, the values
324 : /// need to be re-computed by accessing layers, which we evicted while the
325 : /// caches were filled.
326 : ///
327 : /// Solutions here would be one of the following:
328 : /// 1. Have a persistent cache.
329 : /// 2. Count every access to a cached value to the access stats of all layers
330 : /// that were accessed to compute the value in the first place.
331 : /// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
332 : /// get re-computed from layers, thereby counting towards layer access stats.
333 : /// 4. Make the eviction task imitate the layer accesses that typically hit caches.
334 : ///
335 : /// We follow approach (4) here because in Neon prod deployment:
336 : /// - page cache is quite small => high churn => low hit rate
337 : /// => eviction gets correct access stats
338 : /// - value-level caches such as logical size & repatition have a high hit rate,
339 : /// especially for inactive tenants
340 : /// => eviction sees zero accesses for these
341 : /// => they cause the on-demand download storm on pageserver restart
342 : ///
343 : /// We should probably move to persistent caches in the future, or avoid
344 : /// having inactive tenants attached to pageserver in the first place.
345 0 : #[instrument(skip_all)]
346 : async fn imitate_layer_accesses(
347 : &self,
348 : p: &EvictionPolicyLayerAccessThreshold,
349 : cancel: &CancellationToken,
350 : ctx: &RequestContext,
351 : ) -> ControlFlow<()> {
352 : if !self.tenant_shard_id.is_zero() {
353 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
354 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
355 : // skip imitating logical size accesses for eviction purposes.
356 : return ControlFlow::Continue(());
357 : }
358 :
359 : let mut state = self.eviction_task_timeline_state.lock().await;
360 :
361 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
362 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
363 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
364 :
365 : match state.last_layer_access_imitation {
366 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
367 : _ => {
368 : self.imitate_timeline_cached_layer_accesses(ctx).await;
369 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
370 : }
371 : }
372 : drop(state);
373 :
374 : if cancel.is_cancelled() {
375 : return ControlFlow::Break(());
376 : }
377 :
378 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
379 : // Make one of the tenant's timelines draw the short straw and run the calculation.
380 : // The others wait until the calculation is done so that they take into account the
381 : // imitated accesses that the winner made.
382 : let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
383 : Ok(t) => t,
384 : Err(_) => {
385 : return ControlFlow::Break(());
386 : }
387 : };
388 : let mut state = tenant.eviction_task_tenant_state.lock().await;
389 : match state.last_layer_access_imitation {
390 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
391 : _ => {
392 : self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
393 : .await;
394 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
395 : }
396 : }
397 : drop(state);
398 :
399 : if cancel.is_cancelled() {
400 : return ControlFlow::Break(());
401 : }
402 :
403 : ControlFlow::Continue(())
404 : }
405 :
406 : /// Recompute the values which would cause on-demand downloads during restart.
407 0 : #[instrument(skip_all)]
408 : async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
409 : let lsn = self.get_last_record_lsn();
410 :
411 : // imitiate on-restart initial logical size
412 : let size = self
413 : .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
414 : .instrument(info_span!("calculate_logical_size"))
415 : .await;
416 :
417 : match &size {
418 : Ok(_size) => {
419 : // good, don't log it to avoid confusion
420 : }
421 : Err(_) => {
422 : // we have known issues for which we already log this on consumption metrics,
423 : // gc, and compaction. leave logging out for now.
424 : //
425 : // https://github.com/neondatabase/neon/issues/2539
426 : }
427 : }
428 :
429 : // imitiate repartiting on first compactation
430 : if let Err(e) = self
431 : .collect_keyspace(lsn, ctx)
432 : .instrument(info_span!("collect_keyspace"))
433 : .await
434 : {
435 : // if this failed, we probably failed logical size because these use the same keys
436 : if size.is_err() {
437 : // ignore, see above comment
438 : } else {
439 : match e {
440 : CollectKeySpaceError::Cancelled => {
441 : // Shutting down, ignore
442 : }
443 : err => {
444 0 : warn!(
445 0 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
446 0 : );
447 : }
448 : }
449 : }
450 : }
451 : }
452 :
453 : // Imitate the synthetic size calculation done by the consumption_metrics module.
454 0 : #[instrument(skip_all)]
455 : async fn imitate_synthetic_size_calculation_worker(
456 : &self,
457 : tenant: &Arc<Tenant>,
458 : cancel: &CancellationToken,
459 : ctx: &RequestContext,
460 : ) {
461 : if self.conf.metric_collection_endpoint.is_none() {
462 : // We don't start the consumption metrics task if this is not set in the config.
463 : // So, no need to imitate the accesses in that case.
464 : return;
465 : }
466 :
467 : // The consumption metrics are collected on a per-tenant basis, by a single
468 : // global background loop.
469 : // It limits the number of synthetic size calculations using the global
470 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
471 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
472 : //
473 : // If we used that same semaphore here, then we'd compete for the
474 : // same permits, which may impact timeliness of consumption metrics.
475 : // That is a no-go, as consumption metrics are much more important
476 : // than what we do here.
477 : //
478 : // So, we have a separate semaphore, initialized to the same
479 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
480 : // In the worst, we would have twice the amount of concurrenct size calculations.
481 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
482 : // we spread out the eviction task using `random_init_delay`.
483 : // So, the chance of the worst case is quite low in practice.
484 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
485 : // So, we must coordinate with other with other eviction tasks of this tenant.
486 : let limit = self
487 : .conf
488 : .eviction_task_immitated_concurrent_logical_size_queries
489 : .inner();
490 :
491 : let mut throwaway_cache = HashMap::new();
492 : let gather = crate::tenant::size::gather_inputs(
493 : tenant,
494 : limit,
495 : None,
496 : &mut throwaway_cache,
497 : LogicalSizeCalculationCause::EvictionTaskImitation,
498 : cancel,
499 : ctx,
500 : )
501 : .instrument(info_span!("gather_inputs"));
502 :
503 0 : tokio::select! {
504 : _ = cancel.cancelled() => {}
505 0 : gather_result = gather => {
506 : match gather_result {
507 : Ok(_) => {},
508 : Err(e) => {
509 : // We don't care about the result, but, if it failed, we should log it,
510 : // since consumption metric might be hitting the cached value and
511 : // thus not encountering this error.
512 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
513 : }
514 : }
515 : }
516 : }
517 : }
518 : }
|