Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
24 : use tokio::time::Instant;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
27 :
28 : use crate::{
29 : context::{DownloadBehavior, RequestContext},
30 : pgdatadir_mapping::CollectKeySpaceError,
31 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
32 : tenant::{
33 : tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
34 : },
35 : };
36 :
37 : use utils::completion;
38 :
39 : use super::Timeline;
40 :
41 1568 : #[derive(Default)]
42 : pub struct EvictionTaskTimelineState {
43 : last_layer_access_imitation: Option<tokio::time::Instant>,
44 : }
45 :
46 942 : #[derive(Default)]
47 : pub struct EvictionTaskTenantState {
48 : last_layer_access_imitation: Option<Instant>,
49 : }
50 :
51 : impl Timeline {
52 1235 : pub(super) fn launch_eviction_task(
53 1235 : self: &Arc<Self>,
54 1235 : background_tasks_can_start: Option<&completion::Barrier>,
55 1235 : ) {
56 1235 : let self_clone = Arc::clone(self);
57 1235 : let background_tasks_can_start = background_tasks_can_start.cloned();
58 1235 : task_mgr::spawn(
59 1235 : BACKGROUND_RUNTIME.handle(),
60 1235 : TaskKind::Eviction,
61 1235 : Some(self.tenant_shard_id),
62 1235 : Some(self.timeline_id),
63 1235 : &format!(
64 1235 : "layer eviction for {}/{}",
65 1235 : self.tenant_shard_id, self.timeline_id
66 1235 : ),
67 1235 : false,
68 1235 : async move {
69 1235 : let cancel = task_mgr::shutdown_token();
70 1235 : tokio::select! {
71 1235 : _ = cancel.cancelled() => { return Ok(()); }
72 1235 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
73 1235 : };
74 :
75 4157 : self_clone.eviction_task(cancel).await;
76 545 : Ok(())
77 1235 : },
78 1235 : );
79 1235 : }
80 :
81 1235 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
82 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
83 : use crate::tenant::tasks::random_init_delay;
84 : {
85 : let policy = self.get_eviction_policy();
86 : let period = match policy {
87 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
88 : EvictionPolicy::NoEviction => Duration::from_secs(10),
89 : };
90 : if random_init_delay(period, &cancel).await.is_err() {
91 : return;
92 : }
93 : }
94 :
95 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
96 : loop {
97 : let policy = self.get_eviction_policy();
98 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
99 :
100 : match cf {
101 : ControlFlow::Break(()) => break,
102 : ControlFlow::Continue(sleep_until) => {
103 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
104 : .await
105 : .is_ok()
106 : {
107 : break;
108 : }
109 : }
110 : }
111 : }
112 : }
113 :
114 1353 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
115 : async fn eviction_iteration(
116 : self: &Arc<Self>,
117 : policy: &EvictionPolicy,
118 : cancel: &CancellationToken,
119 : ctx: &RequestContext,
120 : ) -> ControlFlow<(), Instant> {
121 0 : debug!("eviction iteration: {policy:?}");
122 : match policy {
123 : EvictionPolicy::NoEviction => {
124 : // check again in 10 seconds; XXX config watch mechanism
125 : ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
126 : }
127 : EvictionPolicy::LayerAccessThreshold(p) => {
128 : let start = Instant::now();
129 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
130 : ControlFlow::Break(()) => return ControlFlow::Break(()),
131 : ControlFlow::Continue(()) => (),
132 : }
133 : let elapsed = start.elapsed();
134 : crate::tenant::tasks::warn_when_period_overrun(
135 : elapsed,
136 : p.period,
137 : BackgroundLoopKind::Eviction,
138 : );
139 : crate::metrics::EVICTION_ITERATION_DURATION
140 : .get_metric_with_label_values(&[
141 : &format!("{}", p.period.as_secs()),
142 : &format!("{}", p.threshold.as_secs()),
143 : ])
144 : .unwrap()
145 : .observe(elapsed.as_secs_f64());
146 : ControlFlow::Continue(start + p.period)
147 : }
148 : }
149 : }
150 :
151 29 : async fn eviction_iteration_threshold(
152 29 : self: &Arc<Self>,
153 29 : p: &EvictionPolicyLayerAccessThreshold,
154 29 : cancel: &CancellationToken,
155 29 : ctx: &RequestContext,
156 29 : ) -> ControlFlow<()> {
157 29 : let now = SystemTime::now();
158 29 :
159 29 : let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
160 29 : BackgroundLoopKind::Eviction,
161 29 : ctx,
162 29 : );
163 :
164 29 : let _permit = tokio::select! {
165 29 : permit = acquire_permit => permit,
166 : _ = cancel.cancelled() => return ControlFlow::Break(()),
167 : _ = self.cancel.cancelled() => return ControlFlow::Break(()),
168 : };
169 :
170 : // If we evict layers but keep cached values derived from those layers, then
171 : // we face a storm of on-demand downloads after pageserver restart.
172 : // The reason is that the restart empties the caches, and so, the values
173 : // need to be re-computed by accessing layers, which we evicted while the
174 : // caches were filled.
175 : //
176 : // Solutions here would be one of the following:
177 : // 1. Have a persistent cache.
178 : // 2. Count every access to a cached value to the access stats of all layers
179 : // that were accessed to compute the value in the first place.
180 : // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
181 : // get re-computed from layers, thereby counting towards layer access stats.
182 : // 4. Make the eviction task imitate the layer accesses that typically hit caches.
183 : //
184 : // We follow approach (4) here because in Neon prod deployment:
185 : // - page cache is quite small => high churn => low hit rate
186 : // => eviction gets correct access stats
187 : // - value-level caches such as logical size & repatition have a high hit rate,
188 : // especially for inactive tenants
189 : // => eviction sees zero accesses for these
190 : // => they cause the on-demand download storm on pageserver restart
191 : //
192 : // We should probably move to persistent caches in the future, or avoid
193 : // having inactive tenants attached to pageserver in the first place.
194 1658 : match self.imitate_layer_accesses(p, cancel, ctx).await {
195 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
196 29 : ControlFlow::Continue(()) => (),
197 29 : }
198 29 :
199 29 : #[allow(dead_code)]
200 29 : #[derive(Debug, Default)]
201 29 : struct EvictionStats {
202 29 : candidates: usize,
203 29 : evicted: usize,
204 29 : errors: usize,
205 29 : not_evictable: usize,
206 29 : skipped_for_shutdown: usize,
207 29 : }
208 29 :
209 29 : let mut stats = EvictionStats::default();
210 29 : // Gather layers for eviction.
211 29 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
212 29 : // We don't want to hold the layer map lock during eviction.
213 29 :
214 29 : // So, we just need to deal with this.
215 29 :
216 29 : if self.remote_client.is_none() {
217 0 : error!("no remote storage configured, cannot evict layers");
218 0 : return ControlFlow::Continue(());
219 29 : }
220 29 :
221 29 : let mut js = tokio::task::JoinSet::new();
222 : {
223 29 : let guard = self.layers.read().await;
224 29 : let layers = guard.layer_map();
225 1716 : for hist_layer in layers.iter_historic_layers() {
226 1716 : let hist_layer = guard.get_from_desc(&hist_layer);
227 :
228 : // guard against eviction while we inspect it; it might be that eviction_task and
229 : // disk_usage_eviction_task both select the same layers to be evicted, and
230 : // seemingly free up double the space. both succeeding is of no consequence.
231 1716 : let guard = match hist_layer.keep_resident().await {
232 1131 : Ok(Some(l)) => l,
233 585 : Ok(None) => continue,
234 0 : Err(e) => {
235 : // these should not happen, but we cannot make them statically impossible right
236 : // now.
237 0 : tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
238 0 : continue;
239 : }
240 : };
241 :
242 1131 : let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
243 0 : // We only use this fallback if there's an implementation error.
244 0 : // `latest_activity` already does rate-limited warn!() log.
245 0 : debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
246 0 : SystemTime::now()
247 1131 : });
248 :
249 1131 : let no_activity_for = match now.duration_since(last_activity_ts) {
250 950 : Ok(d) => d,
251 181 : Err(_e) => {
252 181 : // We reach here if `now` < `last_activity_ts`, which can legitimately
253 181 : // happen if there is an access between us getting `now`, and us getting
254 181 : // the access stats from the layer.
255 181 : //
256 181 : // The other reason why it can happen is system clock skew because
257 181 : // SystemTime::now() is not monotonic, so, even if there is no access
258 181 : // to the layer after we get `now` at the beginning of this function,
259 181 : // it could be that `now` < `last_activity_ts`.
260 181 : //
261 181 : // To distinguish the cases, we would need to record `Instant`s in the
262 181 : // access stats (i.e., monotonic timestamps), but then, the timestamps
263 181 : // values in the access stats would need to be `Instant`'s, and hence
264 181 : // they would be meaningless outside of the pageserver process.
265 181 : // At the time of writing, the trade-off is that access stats are more
266 181 : // valuable than detecting clock skew.
267 181 : continue;
268 : }
269 : };
270 950 : let layer = guard.drop_eviction_guard();
271 950 : if no_activity_for > p.threshold {
272 38 : // this could cause a lot of allocations in some cases
273 38 : js.spawn(async move { layer.evict_and_wait().await });
274 38 : stats.candidates += 1;
275 912 : }
276 : }
277 : };
278 :
279 29 : let join_all = async move {
280 67 : while let Some(next) = js.join_next().await {
281 0 : match next {
282 38 : Ok(Ok(())) => stats.evicted += 1,
283 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
284 0 : stats.not_evictable += 1;
285 0 : }
286 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
287 0 : Err(je) if je.is_panic() => {
288 0 : /* already logged */
289 0 : stats.errors += 1;
290 0 : }
291 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
292 : }
293 : }
294 29 : stats
295 29 : };
296 :
297 29 : tokio::select! {
298 29 : stats = join_all => {
299 : if stats.candidates == stats.not_evictable {
300 0 : debug!(stats=?stats, "eviction iteration complete");
301 : } else if stats.errors > 0 || stats.not_evictable > 0 {
302 0 : warn!(stats=?stats, "eviction iteration complete");
303 : } else {
304 2 : info!(stats=?stats, "eviction iteration complete");
305 : }
306 : }
307 : _ = cancel.cancelled() => {
308 : // just drop the joinset to "abort"
309 : }
310 : }
311 :
312 29 : ControlFlow::Continue(())
313 29 : }
314 :
315 29 : #[instrument(skip_all)]
316 : async fn imitate_layer_accesses(
317 : &self,
318 : p: &EvictionPolicyLayerAccessThreshold,
319 : cancel: &CancellationToken,
320 : ctx: &RequestContext,
321 : ) -> ControlFlow<()> {
322 : if !self.tenant_shard_id.is_zero() {
323 : // Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
324 : // for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
325 : // skip imitating logical size accesses for eviction purposes.
326 : return ControlFlow::Continue(());
327 : }
328 :
329 : let mut state = self.eviction_task_timeline_state.lock().await;
330 :
331 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
332 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
333 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
334 :
335 : match state.last_layer_access_imitation {
336 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
337 : _ => {
338 : self.imitate_timeline_cached_layer_accesses(ctx).await;
339 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
340 : }
341 : }
342 : drop(state);
343 :
344 : if cancel.is_cancelled() {
345 : return ControlFlow::Break(());
346 : }
347 :
348 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
349 : // Make one of the tenant's timelines draw the short straw and run the calculation.
350 : // The others wait until the calculation is done so that they take into account the
351 : // imitated accesses that the winner made.
352 : let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
353 : Ok(t) => t,
354 : Err(_) => {
355 : return ControlFlow::Break(());
356 : }
357 : };
358 : let mut state = tenant.eviction_task_tenant_state.lock().await;
359 : match state.last_layer_access_imitation {
360 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
361 : _ => {
362 : self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
363 : .await;
364 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
365 : }
366 : }
367 : drop(state);
368 :
369 : if cancel.is_cancelled() {
370 : return ControlFlow::Break(());
371 : }
372 :
373 : ControlFlow::Continue(())
374 : }
375 :
376 : /// Recompute the values which would cause on-demand downloads during restart.
377 0 : #[instrument(skip_all)]
378 : async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
379 : let lsn = self.get_last_record_lsn();
380 :
381 : // imitiate on-restart initial logical size
382 : let size = self
383 : .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
384 : .instrument(info_span!("calculate_logical_size"))
385 : .await;
386 :
387 : match &size {
388 : Ok(_size) => {
389 : // good, don't log it to avoid confusion
390 : }
391 : Err(_) => {
392 : // we have known issues for which we already log this on consumption metrics,
393 : // gc, and compaction. leave logging out for now.
394 : //
395 : // https://github.com/neondatabase/neon/issues/2539
396 : }
397 : }
398 :
399 : // imitiate repartiting on first compactation
400 : if let Err(e) = self
401 : .collect_keyspace(lsn, ctx)
402 : .instrument(info_span!("collect_keyspace"))
403 : .await
404 : {
405 : // if this failed, we probably failed logical size because these use the same keys
406 : if size.is_err() {
407 : // ignore, see above comment
408 : } else {
409 : match e {
410 : CollectKeySpaceError::Cancelled => {
411 : // Shutting down, ignore
412 : }
413 : err => {
414 0 : warn!(
415 0 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
416 0 : );
417 : }
418 : }
419 : }
420 : }
421 : }
422 :
423 : // Imitate the synthetic size calculation done by the consumption_metrics module.
424 6 : #[instrument(skip_all)]
425 : async fn imitate_synthetic_size_calculation_worker(
426 : &self,
427 : tenant: &Arc<Tenant>,
428 : cancel: &CancellationToken,
429 : ctx: &RequestContext,
430 : ) {
431 : if self.conf.metric_collection_endpoint.is_none() {
432 : // We don't start the consumption metrics task if this is not set in the config.
433 : // So, no need to imitate the accesses in that case.
434 : return;
435 : }
436 :
437 : // The consumption metrics are collected on a per-tenant basis, by a single
438 : // global background loop.
439 : // It limits the number of synthetic size calculations using the global
440 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
441 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
442 : //
443 : // If we used that same semaphore here, then we'd compete for the
444 : // same permits, which may impact timeliness of consumption metrics.
445 : // That is a no-go, as consumption metrics are much more important
446 : // than what we do here.
447 : //
448 : // So, we have a separate semaphore, initialized to the same
449 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
450 : // In the worst, we would have twice the amount of concurrenct size calculations.
451 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
452 : // we spread out the eviction task using `random_init_delay`.
453 : // So, the chance of the worst case is quite low in practice.
454 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
455 : // So, we must coordinate with other with other eviction tasks of this tenant.
456 : let limit = self
457 : .conf
458 : .eviction_task_immitated_concurrent_logical_size_queries
459 : .inner();
460 :
461 : let mut throwaway_cache = HashMap::new();
462 : let gather = crate::tenant::size::gather_inputs(
463 : tenant,
464 : limit,
465 : None,
466 : &mut throwaway_cache,
467 : LogicalSizeCalculationCause::EvictionTaskImitation,
468 : cancel,
469 : ctx,
470 : )
471 : .instrument(info_span!("gather_inputs"));
472 :
473 12 : tokio::select! {
474 : _ = cancel.cancelled() => {}
475 6 : gather_result = gather => {
476 : match gather_result {
477 : Ok(_) => {},
478 : Err(e) => {
479 : // We don't care about the result, but, if it failed, we should log it,
480 : // since consumption metric might be hitting the cached value and
481 : // thus not encountering this error.
482 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
483 : }
484 : }
485 : }
486 : }
487 : }
488 : }
|