TLA Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use tokio::time::Instant;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
26 :
27 : use crate::{
28 : context::{DownloadBehavior, RequestContext},
29 : pgdatadir_mapping::CollectKeySpaceError,
30 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
31 : tenant::{
32 : config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
33 : tasks::BackgroundLoopKind,
34 : timeline::EvictionError,
35 : LogicalSizeCalculationCause, Tenant,
36 : },
37 : };
38 :
39 : use utils::completion;
40 :
41 : use super::Timeline;
42 :
43 CBC 1290 : #[derive(Default)]
44 : pub struct EvictionTaskTimelineState {
45 : last_layer_access_imitation: Option<tokio::time::Instant>,
46 : }
47 :
48 778 : #[derive(Default)]
49 : pub struct EvictionTaskTenantState {
50 : last_layer_access_imitation: Option<Instant>,
51 : }
52 :
53 : impl Timeline {
54 1102 : pub(super) fn launch_eviction_task(
55 1102 : self: &Arc<Self>,
56 1102 : background_tasks_can_start: Option<&completion::Barrier>,
57 1102 : ) {
58 1102 : let self_clone = Arc::clone(self);
59 1102 : let background_tasks_can_start = background_tasks_can_start.cloned();
60 1102 : task_mgr::spawn(
61 1102 : BACKGROUND_RUNTIME.handle(),
62 1102 : TaskKind::Eviction,
63 1102 : Some(self.tenant_shard_id),
64 1102 : Some(self.timeline_id),
65 1102 : &format!(
66 1102 : "layer eviction for {}/{}",
67 1102 : self.tenant_shard_id, self.timeline_id
68 1102 : ),
69 1102 : false,
70 1102 : async move {
71 1102 : let cancel = task_mgr::shutdown_token();
72 1102 : tokio::select! {
73 1102 : _ = cancel.cancelled() => { return Ok(()); }
74 1102 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
75 1102 : };
76 :
77 6389 : self_clone.eviction_task(cancel).await;
78 471 : Ok(())
79 1102 : },
80 1102 : );
81 1102 : }
82 :
83 1102 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
84 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
85 : use crate::tenant::tasks::random_init_delay;
86 : {
87 : let policy = self.get_eviction_policy();
88 : let period = match policy {
89 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
90 : EvictionPolicy::NoEviction => Duration::from_secs(10),
91 : };
92 : if random_init_delay(period, &cancel).await.is_err() {
93 : return;
94 : }
95 : }
96 :
97 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
98 : loop {
99 : let policy = self.get_eviction_policy();
100 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
101 :
102 : match cf {
103 : ControlFlow::Break(()) => break,
104 : ControlFlow::Continue(sleep_until) => {
105 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
106 : .await
107 : .is_ok()
108 : {
109 : break;
110 : }
111 : }
112 : }
113 : }
114 : }
115 :
116 1382 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
117 : async fn eviction_iteration(
118 : self: &Arc<Self>,
119 : policy: &EvictionPolicy,
120 : cancel: &CancellationToken,
121 : ctx: &RequestContext,
122 : ) -> ControlFlow<(), Instant> {
123 UBC 0 : debug!("eviction iteration: {policy:?}");
124 : match policy {
125 : EvictionPolicy::NoEviction => {
126 : // check again in 10 seconds; XXX config watch mechanism
127 : ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
128 : }
129 : EvictionPolicy::LayerAccessThreshold(p) => {
130 : let start = Instant::now();
131 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
132 : ControlFlow::Break(()) => return ControlFlow::Break(()),
133 : ControlFlow::Continue(()) => (),
134 : }
135 : let elapsed = start.elapsed();
136 : crate::tenant::tasks::warn_when_period_overrun(
137 : elapsed,
138 : p.period,
139 : BackgroundLoopKind::Eviction,
140 : );
141 : crate::metrics::EVICTION_ITERATION_DURATION
142 : .get_metric_with_label_values(&[
143 : &format!("{}", p.period.as_secs()),
144 : &format!("{}", p.threshold.as_secs()),
145 : ])
146 : .unwrap()
147 : .observe(elapsed.as_secs_f64());
148 : ControlFlow::Continue(start + p.period)
149 : }
150 : }
151 : }
152 :
153 CBC 36 : async fn eviction_iteration_threshold(
154 36 : self: &Arc<Self>,
155 36 : p: &EvictionPolicyLayerAccessThreshold,
156 36 : cancel: &CancellationToken,
157 36 : ctx: &RequestContext,
158 36 : ) -> ControlFlow<()> {
159 36 : let now = SystemTime::now();
160 36 :
161 36 : let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
162 36 : BackgroundLoopKind::Eviction,
163 36 : ctx,
164 36 : );
165 :
166 36 : let _permit = tokio::select! {
167 36 : permit = acquire_permit => permit,
168 : _ = cancel.cancelled() => return ControlFlow::Break(()),
169 : _ = self.cancel.cancelled() => return ControlFlow::Break(()),
170 : };
171 :
172 : // If we evict layers but keep cached values derived from those layers, then
173 : // we face a storm of on-demand downloads after pageserver restart.
174 : // The reason is that the restart empties the caches, and so, the values
175 : // need to be re-computed by accessing layers, which we evicted while the
176 : // caches were filled.
177 : //
178 : // Solutions here would be one of the following:
179 : // 1. Have a persistent cache.
180 : // 2. Count every access to a cached value to the access stats of all layers
181 : // that were accessed to compute the value in the first place.
182 : // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
183 : // get re-computed from layers, thereby counting towards layer access stats.
184 : // 4. Make the eviction task imitate the layer accesses that typically hit caches.
185 : //
186 : // We follow approach (4) here because in Neon prod deployment:
187 : // - page cache is quite small => high churn => low hit rate
188 : // => eviction gets correct access stats
189 : // - value-level caches such as logical size & repatition have a high hit rate,
190 : // especially for inactive tenants
191 : // => eviction sees zero accesses for these
192 : // => they cause the on-demand download storm on pageserver restart
193 : //
194 : // We should probably move to persistent caches in the future, or avoid
195 : // having inactive tenants attached to pageserver in the first place.
196 3902 : match self.imitate_layer_accesses(p, cancel, ctx).await {
197 UBC 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
198 CBC 36 : ControlFlow::Continue(()) => (),
199 36 : }
200 36 :
201 36 : #[allow(dead_code)]
202 36 : #[derive(Debug, Default)]
203 36 : struct EvictionStats {
204 36 : candidates: usize,
205 36 : evicted: usize,
206 36 : errors: usize,
207 36 : not_evictable: usize,
208 36 : skipped_for_shutdown: usize,
209 36 : }
210 36 :
211 36 : let mut stats = EvictionStats::default();
212 : // Gather layers for eviction.
213 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
214 : // We don't want to hold the layer map lock during eviction.
215 :
216 : // So, we just need to deal with this.
217 :
218 36 : let remote_client = match self.remote_client.as_ref() {
219 36 : Some(c) => c,
220 : None => {
221 UBC 0 : error!("no remote storage configured, cannot evict layers");
222 0 : return ControlFlow::Continue(());
223 : }
224 : };
225 :
226 CBC 36 : let mut js = tokio::task::JoinSet::new();
227 : {
228 36 : let guard = self.layers.read().await;
229 36 : let layers = guard.layer_map();
230 1899 : for hist_layer in layers.iter_historic_layers() {
231 1899 : let hist_layer = guard.get_from_desc(&hist_layer);
232 :
233 : // guard against eviction while we inspect it; it might be that eviction_task and
234 : // disk_usage_eviction_task both select the same layers to be evicted, and
235 : // seemingly free up double the space. both succeeding is of no consequence.
236 1899 : let guard = match hist_layer.keep_resident().await {
237 1277 : Ok(Some(l)) => l,
238 622 : Ok(None) => continue,
239 UBC 0 : Err(e) => {
240 : // these should not happen, but we cannot make them statically impossible right
241 : // now.
242 0 : tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
243 0 : continue;
244 : }
245 : };
246 :
247 CBC 1277 : let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
248 UBC 0 : // We only use this fallback if there's an implementation error.
249 0 : // `latest_activity` already does rate-limited warn!() log.
250 0 : debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
251 0 : SystemTime::now()
252 CBC 1277 : });
253 :
254 1277 : let no_activity_for = match now.duration_since(last_activity_ts) {
255 1037 : Ok(d) => d,
256 240 : Err(_e) => {
257 240 : // We reach here if `now` < `last_activity_ts`, which can legitimately
258 240 : // happen if there is an access between us getting `now`, and us getting
259 240 : // the access stats from the layer.
260 240 : //
261 240 : // The other reason why it can happen is system clock skew because
262 240 : // SystemTime::now() is not monotonic, so, even if there is no access
263 240 : // to the layer after we get `now` at the beginning of this function,
264 240 : // it could be that `now` < `last_activity_ts`.
265 240 : //
266 240 : // To distinguish the cases, we would need to record `Instant`s in the
267 240 : // access stats (i.e., monotonic timestamps), but then, the timestamps
268 240 : // values in the access stats would need to be `Instant`'s, and hence
269 240 : // they would be meaningless outside of the pageserver process.
270 240 : // At the time of writing, the trade-off is that access stats are more
271 240 : // valuable than detecting clock skew.
272 240 : continue;
273 : }
274 : };
275 1037 : let layer = guard.drop_eviction_guard();
276 1037 : if no_activity_for > p.threshold {
277 38 : let remote_client = remote_client.clone();
278 38 : // this could cause a lot of allocations in some cases
279 38 : js.spawn(async move { layer.evict_and_wait(&remote_client).await });
280 38 : stats.candidates += 1;
281 999 : }
282 : }
283 : };
284 :
285 36 : let join_all = async move {
286 74 : while let Some(next) = js.join_next().await {
287 UBC 0 : match next {
288 CBC 38 : Ok(Ok(())) => stats.evicted += 1,
289 UBC 0 : Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
290 0 : stats.not_evictable += 1;
291 0 : }
292 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
293 0 : Err(je) if je.is_panic() => {
294 0 : /* already logged */
295 0 : stats.errors += 1;
296 0 : }
297 0 : Err(je) => tracing::error!("unknown JoinError: {je:?}"),
298 : }
299 : }
300 CBC 36 : stats
301 36 : };
302 :
303 36 : tokio::select! {
304 36 : stats = join_all => {
305 : if stats.candidates == stats.not_evictable {
306 UBC 0 : debug!(stats=?stats, "eviction iteration complete");
307 : } else if stats.errors > 0 || stats.not_evictable > 0 {
308 0 : warn!(stats=?stats, "eviction iteration complete");
309 : } else {
310 CBC 2 : info!(stats=?stats, "eviction iteration complete");
311 : }
312 : }
313 : _ = cancel.cancelled() => {
314 : // just drop the joinset to "abort"
315 : }
316 : }
317 :
318 36 : ControlFlow::Continue(())
319 36 : }
320 :
321 36 : #[instrument(skip_all)]
322 : async fn imitate_layer_accesses(
323 : &self,
324 : p: &EvictionPolicyLayerAccessThreshold,
325 : cancel: &CancellationToken,
326 : ctx: &RequestContext,
327 : ) -> ControlFlow<()> {
328 : let mut state = self.eviction_task_timeline_state.lock().await;
329 :
330 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
331 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
332 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
333 :
334 : match state.last_layer_access_imitation {
335 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
336 : _ => {
337 : self.imitate_timeline_cached_layer_accesses(ctx).await;
338 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
339 : }
340 : }
341 : drop(state);
342 :
343 : if cancel.is_cancelled() {
344 : return ControlFlow::Break(());
345 : }
346 :
347 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
348 : // Make one of the tenant's timelines draw the short straw and run the calculation.
349 : // The others wait until the calculation is done so that they take into account the
350 : // imitated accesses that the winner made.
351 : let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
352 : Ok(t) => t,
353 : Err(_) => {
354 : return ControlFlow::Break(());
355 : }
356 : };
357 : let mut state = tenant.eviction_task_tenant_state.lock().await;
358 : match state.last_layer_access_imitation {
359 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
360 : _ => {
361 : self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
362 : .await;
363 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
364 : }
365 : }
366 : drop(state);
367 :
368 : if cancel.is_cancelled() {
369 : return ControlFlow::Break(());
370 : }
371 :
372 : ControlFlow::Continue(())
373 : }
374 :
375 : /// Recompute the values which would cause on-demand downloads during restart.
376 UBC 0 : #[instrument(skip_all)]
377 : async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
378 : let lsn = self.get_last_record_lsn();
379 :
380 : // imitiate on-restart initial logical size
381 : let size = self
382 : .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
383 : .instrument(info_span!("calculate_logical_size"))
384 : .await;
385 :
386 : match &size {
387 : Ok(_size) => {
388 : // good, don't log it to avoid confusion
389 : }
390 : Err(_) => {
391 : // we have known issues for which we already log this on consumption metrics,
392 : // gc, and compaction. leave logging out for now.
393 : //
394 : // https://github.com/neondatabase/neon/issues/2539
395 : }
396 : }
397 :
398 : // imitiate repartiting on first compactation
399 : if let Err(e) = self
400 : .collect_keyspace(lsn, ctx)
401 : .instrument(info_span!("collect_keyspace"))
402 : .await
403 : {
404 : // if this failed, we probably failed logical size because these use the same keys
405 : if size.is_err() {
406 : // ignore, see above comment
407 : } else {
408 : match e {
409 : CollectKeySpaceError::Cancelled => {
410 : // Shutting down, ignore
411 : }
412 : err => {
413 0 : warn!(
414 0 : "failed to collect keyspace but succeeded in calculating logical size: {err:#}"
415 0 : );
416 : }
417 : }
418 : }
419 : }
420 : }
421 :
422 : // Imitate the synthetic size calculation done by the consumption_metrics module.
423 CBC 8 : #[instrument(skip_all)]
424 : async fn imitate_synthetic_size_calculation_worker(
425 : &self,
426 : tenant: &Arc<Tenant>,
427 : cancel: &CancellationToken,
428 : ctx: &RequestContext,
429 : ) {
430 : if self.conf.metric_collection_endpoint.is_none() {
431 : // We don't start the consumption metrics task if this is not set in the config.
432 : // So, no need to imitate the accesses in that case.
433 : return;
434 : }
435 :
436 : // The consumption metrics are collected on a per-tenant basis, by a single
437 : // global background loop.
438 : // It limits the number of synthetic size calculations using the global
439 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
440 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
441 : //
442 : // If we used that same semaphore here, then we'd compete for the
443 : // same permits, which may impact timeliness of consumption metrics.
444 : // That is a no-go, as consumption metrics are much more important
445 : // than what we do here.
446 : //
447 : // So, we have a separate semaphore, initialized to the same
448 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
449 : // In the worst, we would have twice the amount of concurrenct size calculations.
450 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
451 : // we spread out the eviction task using `random_init_delay`.
452 : // So, the chance of the worst case is quite low in practice.
453 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
454 : // So, we must coordinate with other with other eviction tasks of this tenant.
455 : let limit = self
456 : .conf
457 : .eviction_task_immitated_concurrent_logical_size_queries
458 : .inner();
459 :
460 : let mut throwaway_cache = HashMap::new();
461 : let gather = crate::tenant::size::gather_inputs(
462 : tenant,
463 : limit,
464 : None,
465 : &mut throwaway_cache,
466 : LogicalSizeCalculationCause::EvictionTaskImitation,
467 : cancel,
468 : ctx,
469 : )
470 : .instrument(info_span!("gather_inputs"));
471 :
472 16 : tokio::select! {
473 : _ = cancel.cancelled() => {}
474 8 : gather_result = gather => {
475 : match gather_result {
476 : Ok(_) => {},
477 : Err(e) => {
478 : // We don't care about the result, but, if it failed, we should log it,
479 : // since consumption metric might be hitting the cached value and
480 : // thus not encountering this error.
481 UBC 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
482 : }
483 : }
484 : }
485 : }
486 : }
487 : }
|