Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
24 : use tokio::time::Instant;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
27 :
28 : use crate::{
29 : context::{DownloadBehavior, RequestContext},
30 : pgdatadir_mapping::CollectKeySpaceError,
31 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
32 : tenant::{
33 : tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
34 : },
35 : };
36 :
37 : use utils::completion;
38 :
39 : use super::Timeline;
40 :
41 1592 : #[derive(Default)]
42 : pub struct EvictionTaskTimelineState {
43 : last_layer_access_imitation: Option<tokio::time::Instant>,
44 : }
45 :
46 970 : #[derive(Default)]
47 : pub struct EvictionTaskTenantState {
48 : last_layer_access_imitation: Option<Instant>,
49 : }
50 :
51 : impl Timeline {
52 1257 : pub(super) fn launch_eviction_task(
53 1257 : self: &Arc<Self>,
54 1257 : background_tasks_can_start: Option<&completion::Barrier>,
55 1257 : ) {
56 1257 : let self_clone = Arc::clone(self);
57 1257 : let background_tasks_can_start = background_tasks_can_start.cloned();
58 1257 : task_mgr::spawn(
59 1257 : BACKGROUND_RUNTIME.handle(),
60 1257 : TaskKind::Eviction,
61 1257 : Some(self.tenant_shard_id),
62 1257 : Some(self.timeline_id),
63 1257 : &format!(
64 1257 : "layer eviction for {}/{}",
65 1257 : self.tenant_shard_id, self.timeline_id
66 1257 : ),
67 1257 : false,
68 1257 : async move {
69 1257 : let cancel = task_mgr::shutdown_token();
70 1257 : tokio::select! {
71 1257 : _ = cancel.cancelled() => { return Ok(()); }
72 1257 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
73 1257 : };
74 :
75 4927 : self_clone.eviction_task(cancel).await;
76 560 : Ok(())
77 1257 : },
78 1257 : );
79 1257 : }
80 :
81 2512 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
82 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
83 : use crate::tenant::tasks::random_init_delay;
84 : {
85 : let policy = self.get_eviction_policy();
86 : let period = match policy {
87 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
88 : EvictionPolicy::NoEviction => Duration::from_secs(10),
89 : };
90 : if random_init_delay(period, &cancel).await.is_err() {
91 : return;
92 : }
93 : }
94 :
95 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
96 : loop {
97 : let policy = self.get_eviction_policy();
98 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
99 :
100 : match cf {
101 : ControlFlow::Break(()) => break,
102 : ControlFlow::Continue(sleep_until) => {
103 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
104 : .await
105 : .is_ok()
106 : {
107 : break;
108 : }
109 : }
110 : }
111 : }
112 : }
113 :
114 2816 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
115 : async fn eviction_iteration(
116 : self: &Arc<Self>,
117 : policy: &EvictionPolicy,
118 : cancel: &CancellationToken,
119 : ctx: &RequestContext,
120 : ) -> ControlFlow<(), Instant> {
121 0 : debug!("eviction iteration: {policy:?}");
122 : match policy {
123 : EvictionPolicy::NoEviction => {
124 : // check again in 10 seconds; XXX config watch mechanism
125 : ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
126 : }
127 : EvictionPolicy::LayerAccessThreshold(p) => {
128 : let start = Instant::now();
129 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
130 : ControlFlow::Break(()) => return ControlFlow::Break(()),
131 : ControlFlow::Continue(()) => (),
132 : }
133 : let elapsed = start.elapsed();
134 : crate::tenant::tasks::warn_when_period_overrun(
135 : elapsed,
136 : p.period,
137 : BackgroundLoopKind::Eviction,
138 : );
139 : crate::metrics::EVICTION_ITERATION_DURATION
140 : .get_metric_with_label_values(&[
141 : &format!("{}", p.period.as_secs()),
142 : &format!("{}", p.threshold.as_secs()),
143 : ])
144 : .unwrap()
145 : .observe(elapsed.as_secs_f64());
146 : ControlFlow::Continue(start + p.period)
147 : }
148 : }
149 : }
150 :
151 32 : async fn eviction_iteration_threshold(
152 32 : self: &Arc<Self>,
153 32 : p: &EvictionPolicyLayerAccessThreshold,
154 32 : cancel: &CancellationToken,
155 32 : ctx: &RequestContext,
156 32 : ) -> ControlFlow<()> {
157 32 : let now = SystemTime::now();
158 32 :
159 32 : let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
160 32 : BackgroundLoopKind::Eviction,
161 32 : ctx,
162 32 : );
163 :
164 32 : let _permit = tokio::select! {
165 32 : permit = acquire_permit => permit,
166 : _ = cancel.cancelled() => return ControlFlow::Break(()),
167 : _ = self.cancel.cancelled() => return ControlFlow::Break(()),
168 : };
169 :
170 : // If we evict layers but keep cached values derived from those layers, then
171 : // we face a storm of on-demand downloads after pageserver restart.
172 : // The reason is that the restart empties the caches, and so, the values
173 : // need to be re-computed by accessing layers, which we evicted while the
174 : // caches were filled.
175 : //
176 : // Solutions here would be one of the following:
177 : // 1. Have a persistent cache.
178 : // 2. Count every access to a cached value to the access stats of all layers
179 : // that were accessed to compute the value in the first place.
180 : // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
181 : // get re-computed from layers, thereby counting towards layer access stats.
182 : // 4. Make the eviction task imitate the layer accesses that typically hit caches.
183 : //
184 : // We follow approach (4) here because in Neon prod deployment:
185 : // - page cache is quite small => high churn => low hit rate
186 : // => eviction gets correct access stats
187 : // - value-level caches such as logical size & repatition have a high hit rate,
188 : // especially for inactive tenants
189 : // => eviction sees zero accesses for these
190 : // => they cause the on-demand download storm on pageserver restart
191 : //
192 : // We should probably move to persistent caches in the future, or avoid
193 : // having inactive tenants attached to pageserver in the first place.
194 2340 : match self.imitate_layer_accesses(p, cancel, ctx).await {
195 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
196 32 : ControlFlow::Continue(()) => (),
197 32 : }
198 32 :
199 32 : #[allow(dead_code)]
200 32 : #[derive(Debug, Default)]
201 32 : struct EvictionStats {
202 32 : candidates: usize,
203 32 : evicted: usize,
204 32 : errors: usize,
205 32 : not_evictable: usize,
206 32 : skipped_for_shutdown: usize,
207 32 : }
208 32 :
209 32 : let mut stats = EvictionStats::default();
210 32 : // Gather layers for eviction.
211 32 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
212 32 : // We don't want to hold the layer map lock during eviction.
213 32 :
214 32 : // So, we just need to deal with this.
215 32 :
216 32 : if self.remote_client.is_none() {
217 0 : error!("no remote storage configured, cannot evict layers");
218 0 : return ControlFlow::Continue(());
219 32 : }
220 32 :
221 32 : let mut js = tokio::task::JoinSet::new();
222 : {
223 32 : let guard = self.layers.read().await;
224 32 : let layers = guard.layer_map();
225 1768 : for hist_layer in layers.iter_historic_layers() {
226 1768 : let hist_layer = guard.get_from_desc(&hist_layer);
227 :
228 : // guard against eviction while we inspect it; it might be that eviction_task and
229 : // disk_usage_eviction_task both select the same layers to be evicted, and
230 : // seemingly free up double the space. both succeeding is of no consequence.
231 1768 : let guard = match hist_layer.keep_resident().await {
232 1155 : Ok(Some(l)) => l,
233 613 : Ok(None) => continue,
234 0 : Err(e) => {
235 : // these should not happen, but we cannot make them statically impossible right
236 : // now.
237 0 : tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
238 0 : continue;
239 : }
240 : };
241 :
242 1155 : let last_activity_ts = hist_layer.access_stats().latest_activity_or_now();
243 :
244 1155 : let no_activity_for = match now.duration_since(last_activity_ts) {
245 953 : Ok(d) => d,
246 202 : Err(_e) => {
247 202 : // We reach here if `now` < `last_activity_ts`, which can legitimately
248 202 : // happen if there is an access between us getting `now`, and us getting
249 202 : // the access stats from the layer.
250 202 : //
251 202 : // The other reason why it can happen is system clock skew because
252 202 : // SystemTime::now() is not monotonic, so, even if there is no access
253 202 : // to the layer after we get `now` at the beginning of this function,
254 202 : // it could be that `now` < `last_activity_ts`.
255 202 : //
256 202 : // To distinguish the cases, we would need to record `Instant`s in the
257 202 : // access stats (i.e., monotonic timestamps), but then, the timestamps
258 202 : // values in the access stats would need to be `Instant`'s, and hence
259 202 : // they would be meaningless outside of the pageserver process.
260 202 : // At the time of writing, the trade-off is that access stats are more
261 202 : // valuable than detecting clock skew.
262 202 : continue;
263 : }
264 : };
265 953 : let layer = guard.drop_eviction_guard();
266 953 : if no_activity_for > p.threshold {
267 38 : // this could cause a lot of allocations in some cases
268 38 : js.spawn(async move { layer.evict_and_wait().await });
269 38 : stats.candidates += 1;
270 915 : }
271 : }
272 : };
273 :
274 32 : let join_all = async move {
275 70 : while let Some(next) = js.join_next().await {
276 0 : match next {
277 38 : Ok(Ok(())) => stats.evicted += 1,
278 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
279 0 : stats.not_evictable += 1;
280 0 : }
281 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
282 0 : Err(je) if je.is_panic() => {
283 0 : /* already logged */
284 0 : stats.errors += 1;
285 0 : }
286 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
287 : }
288 : }
289 32 : stats
290 32 : };
291 :
292 32 : tokio::select! {
293 32 : stats = join_all => {
294 : if stats.candidates == stats.not_evictable {
295 0 : debug!(stats=?stats, "eviction iteration complete");
296 : } else if stats.errors > 0 || stats.not_evictable > 0 {
297 0 : warn!(stats=?stats, "eviction iteration complete");
298 : } else {
299 2 : info!(stats=?stats, "eviction iteration complete");
300 : }
301 : }
302 : _ = cancel.cancelled() => {
303 : // just drop the joinset to "abort"
304 : }
305 : }
306 :
307 32 : ControlFlow::Continue(())
308 32 : }
309 :
310 64 : #[instrument(skip_all)]
311 : async fn imitate_layer_accesses(
312 : &self,
313 : p: &EvictionPolicyLayerAccessThreshold,
314 : cancel: &CancellationToken,
315 : ctx: &RequestContext,
316 : ) -> ControlFlow<()> {
317 : if !self.tenant_shard_id.is_zero() {
318 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
319 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
320 : // skip imitating logical size accesses for eviction purposes.
321 : return ControlFlow::Continue(());
322 : }
323 :
324 : let mut state = self.eviction_task_timeline_state.lock().await;
325 :
326 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
327 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
328 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
329 :
330 : match state.last_layer_access_imitation {
331 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
332 : _ => {
333 : self.imitate_timeline_cached_layer_accesses(ctx).await;
334 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
335 : }
336 : }
337 : drop(state);
338 :
339 : if cancel.is_cancelled() {
340 : return ControlFlow::Break(());
341 : }
342 :
343 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
344 : // Make one of the tenant's timelines draw the short straw and run the calculation.
345 : // The others wait until the calculation is done so that they take into account the
346 : // imitated accesses that the winner made.
347 : let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
348 : Ok(t) => t,
349 : Err(_) => {
350 : return ControlFlow::Break(());
351 : }
352 : };
353 : let mut state = tenant.eviction_task_tenant_state.lock().await;
354 : match state.last_layer_access_imitation {
355 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
356 : _ => {
357 : self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
358 : .await;
359 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
360 : }
361 : }
362 : drop(state);
363 :
364 : if cancel.is_cancelled() {
365 : return ControlFlow::Break(());
366 : }
367 :
368 : ControlFlow::Continue(())
369 : }
370 :
371 : /// Recompute the values which would cause on-demand downloads during restart.
372 14 : #[instrument(skip_all)]
373 : async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
374 : let lsn = self.get_last_record_lsn();
375 :
376 : // imitiate on-restart initial logical size
377 : let size = self
378 : .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
379 : .instrument(info_span!("calculate_logical_size"))
380 : .await;
381 :
382 : match &size {
383 : Ok(_size) => {
384 : // good, don't log it to avoid confusion
385 : }
386 : Err(_) => {
387 : // we have known issues for which we already log this on consumption metrics,
388 : // gc, and compaction. leave logging out for now.
389 : //
390 : // https://github.com/neondatabase/neon/issues/2539
391 : }
392 : }
393 :
394 : // imitiate repartiting on first compactation
395 : if let Err(e) = self
396 : .collect_keyspace(lsn, ctx)
397 : .instrument(info_span!("collect_keyspace"))
398 : .await
399 : {
400 : // if this failed, we probably failed logical size because these use the same keys
401 : if size.is_err() {
402 : // ignore, see above comment
403 : } else {
404 : match e {
405 : CollectKeySpaceError::Cancelled => {
406 : // Shutting down, ignore
407 : }
408 : err => {
409 0 : warn!(
410 0 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
411 0 : );
412 : }
413 : }
414 : }
415 : }
416 : }
417 :
418 : // Imitate the synthetic size calculation done by the consumption_metrics module.
419 14 : #[instrument(skip_all)]
420 : async fn imitate_synthetic_size_calculation_worker(
421 : &self,
422 : tenant: &Arc<Tenant>,
423 : cancel: &CancellationToken,
424 : ctx: &RequestContext,
425 : ) {
426 : if self.conf.metric_collection_endpoint.is_none() {
427 : // We don't start the consumption metrics task if this is not set in the config.
428 : // So, no need to imitate the accesses in that case.
429 : return;
430 : }
431 :
432 : // The consumption metrics are collected on a per-tenant basis, by a single
433 : // global background loop.
434 : // It limits the number of synthetic size calculations using the global
435 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
436 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
437 : //
438 : // If we used that same semaphore here, then we'd compete for the
439 : // same permits, which may impact timeliness of consumption metrics.
440 : // That is a no-go, as consumption metrics are much more important
441 : // than what we do here.
442 : //
443 : // So, we have a separate semaphore, initialized to the same
444 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
445 : // In the worst, we would have twice the amount of concurrenct size calculations.
446 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
447 : // we spread out the eviction task using `random_init_delay`.
448 : // So, the chance of the worst case is quite low in practice.
449 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
450 : // So, we must coordinate with other with other eviction tasks of this tenant.
451 : let limit = self
452 : .conf
453 : .eviction_task_immitated_concurrent_logical_size_queries
454 : .inner();
455 :
456 : let mut throwaway_cache = HashMap::new();
457 : let gather = crate::tenant::size::gather_inputs(
458 : tenant,
459 : limit,
460 : None,
461 : &mut throwaway_cache,
462 : LogicalSizeCalculationCause::EvictionTaskImitation,
463 : cancel,
464 : ctx,
465 : )
466 : .instrument(info_span!("gather_inputs"));
467 :
468 14 : tokio::select! {
469 : _ = cancel.cancelled() => {}
470 7 : gather_result = gather => {
471 : match gather_result {
472 : Ok(_) => {},
473 : Err(e) => {
474 : // We don't care about the result, but, if it failed, we should log it,
475 : // since consumption metric might be hitting the cached value and
476 : // thus not encountering this error.
477 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
478 : }
479 : }
480 : }
481 : }
482 : }
483 : }
|