Line data Source code
1 : //! The per-timeline layer eviction task, which evicts data which has not been accessed for more
2 : //! than a given threshold.
3 : //!
4 : //! Data includes all kinds of caches, namely:
5 : //! - (in-memory layers)
6 : //! - on-demand downloaded layer files on disk
7 : //! - (cached layer file pages)
8 : //! - derived data from layer file contents, namely:
9 : //! - initial logical size
10 : //! - partitioning
11 : //! - (other currently missing unknowns)
12 : //!
13 : //! Items with parentheses are not (yet) touched by this task.
14 : //!
15 : //! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
16 : use std::{
17 : collections::HashMap,
18 : ops::ControlFlow,
19 : sync::Arc,
20 : time::{Duration, SystemTime},
21 : };
22 :
23 : use tokio::time::Instant;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
26 :
27 : use crate::{
28 : context::{DownloadBehavior, RequestContext},
29 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
30 : tenant::{
31 : config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
32 : storage_layer::PersistentLayer,
33 : timeline::EvictionError,
34 : LogicalSizeCalculationCause, Tenant,
35 : },
36 : };
37 :
38 : use utils::completion;
39 :
40 : use super::Timeline;
41 :
42 1394 : #[derive(Default)]
43 : pub struct EvictionTaskTimelineState {
44 : last_layer_access_imitation: Option<tokio::time::Instant>,
45 : }
46 :
47 779 : #[derive(Default)]
48 : pub struct EvictionTaskTenantState {
49 : last_layer_access_imitation: Option<Instant>,
50 : }
51 :
52 : impl Timeline {
53 1190 : pub(super) fn launch_eviction_task(
54 1190 : self: &Arc<Self>,
55 1190 : background_tasks_can_start: Option<&completion::Barrier>,
56 1190 : ) {
57 1190 : let self_clone = Arc::clone(self);
58 1190 : let background_tasks_can_start = background_tasks_can_start.cloned();
59 1190 : task_mgr::spawn(
60 1190 : BACKGROUND_RUNTIME.handle(),
61 1190 : TaskKind::Eviction,
62 1190 : Some(self.tenant_id),
63 1190 : Some(self.timeline_id),
64 1190 : &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
65 1190 : false,
66 1190 : async move {
67 1190 : let cancel = task_mgr::shutdown_token();
68 1356 : tokio::select! {
69 1356 : _ = cancel.cancelled() => { return Ok(()); }
70 1356 : _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
71 1356 : };
72 :
73 2279 : self_clone.eviction_task(cancel).await;
74 478 : Ok(())
75 1190 : },
76 1190 : );
77 1190 : }
78 :
79 3291 : #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
80 : async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
81 : use crate::tenant::tasks::random_init_delay;
82 : {
83 : let policy = self.get_eviction_policy();
84 : let period = match policy {
85 : EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
86 : EvictionPolicy::NoEviction => Duration::from_secs(10),
87 : };
88 : if random_init_delay(period, &cancel).await.is_err() {
89 : return;
90 : }
91 : }
92 :
93 : let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
94 : loop {
95 : let policy = self.get_eviction_policy();
96 : let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
97 :
98 : match cf {
99 : ControlFlow::Break(()) => break,
100 : ControlFlow::Continue(sleep_until) => {
101 : if tokio::time::timeout_at(sleep_until, cancel.cancelled())
102 : .await
103 : .is_ok()
104 : {
105 : break;
106 : }
107 : }
108 : }
109 : }
110 : }
111 :
112 1930 : #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
113 : async fn eviction_iteration(
114 : self: &Arc<Self>,
115 : policy: &EvictionPolicy,
116 : cancel: &CancellationToken,
117 : ctx: &RequestContext,
118 : ) -> ControlFlow<(), Instant> {
119 0 : debug!("eviction iteration: {policy:?}");
120 : match policy {
121 : EvictionPolicy::NoEviction => {
122 : // check again in 10 seconds; XXX config watch mechanism
123 : ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
124 : }
125 : EvictionPolicy::LayerAccessThreshold(p) => {
126 : let start = Instant::now();
127 : match self.eviction_iteration_threshold(p, cancel, ctx).await {
128 : ControlFlow::Break(()) => return ControlFlow::Break(()),
129 : ControlFlow::Continue(()) => (),
130 : }
131 : let elapsed = start.elapsed();
132 : crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
133 : crate::metrics::EVICTION_ITERATION_DURATION
134 : .get_metric_with_label_values(&[
135 : &format!("{}", p.period.as_secs()),
136 : &format!("{}", p.threshold.as_secs()),
137 : ])
138 : .unwrap()
139 : .observe(elapsed.as_secs_f64());
140 : ControlFlow::Continue(start + p.period)
141 : }
142 : }
143 : }
144 :
145 27 : async fn eviction_iteration_threshold(
146 27 : self: &Arc<Self>,
147 27 : p: &EvictionPolicyLayerAccessThreshold,
148 27 : cancel: &CancellationToken,
149 27 : ctx: &RequestContext,
150 27 : ) -> ControlFlow<()> {
151 27 : let now = SystemTime::now();
152 27 :
153 27 : // If we evict layers but keep cached values derived from those layers, then
154 27 : // we face a storm of on-demand downloads after pageserver restart.
155 27 : // The reason is that the restart empties the caches, and so, the values
156 27 : // need to be re-computed by accessing layers, which we evicted while the
157 27 : // caches were filled.
158 27 : //
159 27 : // Solutions here would be one of the following:
160 27 : // 1. Have a persistent cache.
161 27 : // 2. Count every access to a cached value to the access stats of all layers
162 27 : // that were accessed to compute the value in the first place.
163 27 : // 3. Invalidate the caches at a period of < p.threshold/2, so that the values
164 27 : // get re-computed from layers, thereby counting towards layer access stats.
165 27 : // 4. Make the eviction task imitate the layer accesses that typically hit caches.
166 27 : //
167 27 : // We follow approach (4) here because in Neon prod deployment:
168 27 : // - page cache is quite small => high churn => low hit rate
169 27 : // => eviction gets correct access stats
170 27 : // - value-level caches such as logical size & repatition have a high hit rate,
171 27 : // especially for inactive tenants
172 27 : // => eviction sees zero accesses for these
173 27 : // => they cause the on-demand download storm on pageserver restart
174 27 : //
175 27 : // We should probably move to persistent caches in the future, or avoid
176 27 : // having inactive tenants attached to pageserver in the first place.
177 839 : match self.imitate_layer_accesses(p, cancel, ctx).await {
178 0 : ControlFlow::Break(()) => return ControlFlow::Break(()),
179 27 : ControlFlow::Continue(()) => (),
180 27 : }
181 27 :
182 27 : #[allow(dead_code)]
183 27 : #[derive(Debug, Default)]
184 27 : struct EvictionStats {
185 27 : candidates: usize,
186 27 : evicted: usize,
187 27 : errors: usize,
188 27 : not_evictable: usize,
189 27 : skipped_for_shutdown: usize,
190 27 : }
191 27 :
192 27 : let mut stats = EvictionStats::default();
193 : // Gather layers for eviction.
194 : // NB: all the checks can be invalidated as soon as we release the layer map lock.
195 : // We don't want to hold the layer map lock during eviction.
196 : // So, we just need to deal with this.
197 27 : let candidates: Vec<Arc<dyn PersistentLayer>> = {
198 27 : let guard = self.layers.read().await;
199 27 : let layers = guard.layer_map();
200 27 : let mut candidates = Vec::new();
201 1595 : for hist_layer in layers.iter_historic_layers() {
202 1595 : let hist_layer = guard.get_from_desc(&hist_layer);
203 1595 : if hist_layer.is_remote_layer() {
204 608 : continue;
205 987 : }
206 987 :
207 987 : let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
208 0 : // We only use this fallback if there's an implementation error.
209 0 : // `latest_activity` already does rate-limited warn!() log.
210 0 : debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
211 0 : SystemTime::now()
212 987 : });
213 :
214 987 : let no_activity_for = match now.duration_since(last_activity_ts) {
215 828 : Ok(d) => d,
216 159 : Err(_e) => {
217 159 : // We reach here if `now` < `last_activity_ts`, which can legitimately
218 159 : // happen if there is an access between us getting `now`, and us getting
219 159 : // the access stats from the layer.
220 159 : //
221 159 : // The other reason why it can happen is system clock skew because
222 159 : // SystemTime::now() is not monotonic, so, even if there is no access
223 159 : // to the layer after we get `now` at the beginning of this function,
224 159 : // it could be that `now` < `last_activity_ts`.
225 159 : //
226 159 : // To distinguish the cases, we would need to record `Instant`s in the
227 159 : // access stats (i.e., monotonic timestamps), but then, the timestamps
228 159 : // values in the access stats would need to be `Instant`'s, and hence
229 159 : // they would be meaningless outside of the pageserver process.
230 159 : // At the time of writing, the trade-off is that access stats are more
231 159 : // valuable than detecting clock skew.
232 159 : continue;
233 : }
234 : };
235 828 : if no_activity_for > p.threshold {
236 38 : candidates.push(hist_layer)
237 790 : }
238 : }
239 27 : candidates
240 27 : };
241 27 : stats.candidates = candidates.len();
242 :
243 27 : let remote_client = match self.remote_client.as_ref() {
244 : None => {
245 0 : error!(
246 0 : num_candidates = candidates.len(),
247 0 : "no remote storage configured, cannot evict layers"
248 0 : );
249 0 : return ControlFlow::Continue(());
250 : }
251 27 : Some(c) => c,
252 : };
253 :
254 27 : let results = match self
255 27 : .evict_layer_batch(remote_client, &candidates[..], cancel.clone())
256 2 : .await
257 : {
258 0 : Err(pre_err) => {
259 0 : stats.errors += candidates.len();
260 0 : error!("could not do any evictions: {pre_err:#}");
261 0 : return ControlFlow::Continue(());
262 : }
263 27 : Ok(results) => results,
264 27 : };
265 27 : assert_eq!(results.len(), candidates.len());
266 38 : for (l, result) in candidates.iter().zip(results) {
267 0 : match result {
268 0 : None => {
269 0 : stats.skipped_for_shutdown += 1;
270 0 : }
271 38 : Some(Ok(())) => {
272 38 : stats.evicted += 1;
273 38 : }
274 0 : Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
275 0 : stats.not_evictable += 1;
276 0 : }
277 0 : Some(Err(EvictionError::FileNotFound)) => {
278 0 : // compaction/gc removed the file while we were waiting on layer_removal_cs
279 0 : stats.not_evictable += 1;
280 0 : }
281 : Some(Err(
282 0 : e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
283 : )) => {
284 0 : let e = utils::error::report_compact_sources(&e);
285 0 : warn!(layer = %l, "failed to evict layer: {e}");
286 0 : stats.not_evictable += 1;
287 : }
288 : }
289 : }
290 27 : if stats.candidates == stats.not_evictable {
291 0 : debug!(stats=?stats, "eviction iteration complete");
292 1 : } else if stats.errors > 0 || stats.not_evictable > 0 {
293 0 : warn!(stats=?stats, "eviction iteration complete");
294 : } else {
295 1 : info!(stats=?stats, "eviction iteration complete");
296 : }
297 27 : ControlFlow::Continue(())
298 27 : }
299 :
300 108 : #[instrument(skip_all)]
301 : async fn imitate_layer_accesses(
302 : &self,
303 : p: &EvictionPolicyLayerAccessThreshold,
304 : cancel: &CancellationToken,
305 : ctx: &RequestContext,
306 : ) -> ControlFlow<()> {
307 : let mut state = self.eviction_task_timeline_state.lock().await;
308 :
309 : // Only do the imitate_layer accesses approximately as often as the threshold. A little
310 : // more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
311 : let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
312 :
313 : match state.last_layer_access_imitation {
314 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
315 : _ => {
316 : self.imitate_timeline_cached_layer_accesses(cancel, ctx)
317 : .await;
318 : state.last_layer_access_imitation = Some(tokio::time::Instant::now())
319 : }
320 : }
321 : drop(state);
322 :
323 : if cancel.is_cancelled() {
324 : return ControlFlow::Break(());
325 : }
326 :
327 : // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
328 : // Make one of the tenant's timelines draw the short straw and run the calculation.
329 : // The others wait until the calculation is done so that they take into account the
330 : // imitated accesses that the winner made.
331 : let Ok(tenant) = crate::tenant::mgr::get_tenant(self.tenant_id, true).await else {
332 : // likely, we're shutting down
333 : return ControlFlow::Break(());
334 : };
335 : let mut state = tenant.eviction_task_tenant_state.lock().await;
336 : match state.last_layer_access_imitation {
337 : Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
338 : _ => {
339 : self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
340 : .await;
341 : state.last_layer_access_imitation = Some(tokio::time::Instant::now());
342 : }
343 : }
344 : drop(state);
345 :
346 : if cancel.is_cancelled() {
347 : return ControlFlow::Break(());
348 : }
349 :
350 : ControlFlow::Continue(())
351 : }
352 :
353 : /// Recompute the values which would cause on-demand downloads during restart.
354 18 : #[instrument(skip_all)]
355 : async fn imitate_timeline_cached_layer_accesses(
356 : &self,
357 : cancel: &CancellationToken,
358 : ctx: &RequestContext,
359 : ) {
360 : let lsn = self.get_last_record_lsn();
361 :
362 : // imitiate on-restart initial logical size
363 : let size = self
364 : .calculate_logical_size(
365 : lsn,
366 : LogicalSizeCalculationCause::EvictionTaskImitation,
367 : cancel.clone(),
368 : ctx,
369 : )
370 : .instrument(info_span!("calculate_logical_size"))
371 : .await;
372 :
373 : match &size {
374 : Ok(_size) => {
375 : // good, don't log it to avoid confusion
376 : }
377 : Err(_) => {
378 : // we have known issues for which we already log this on consumption metrics,
379 : // gc, and compaction. leave logging out for now.
380 : //
381 : // https://github.com/neondatabase/neon/issues/2539
382 : }
383 : }
384 :
385 : // imitiate repartiting on first compactation
386 : if let Err(e) = self
387 : .collect_keyspace(lsn, ctx)
388 : .instrument(info_span!("collect_keyspace"))
389 : .await
390 : {
391 : // if this failed, we probably failed logical size because these use the same keys
392 : if size.is_err() {
393 : // ignore, see above comment
394 : } else {
395 0 : warn!(
396 0 : "failed to collect keyspace but succeeded in calculating logical size: {e:#}"
397 0 : );
398 : }
399 : }
400 : }
401 :
402 : // Imitate the synthetic size calculation done by the consumption_metrics module.
403 24 : #[instrument(skip_all)]
404 : async fn imitate_synthetic_size_calculation_worker(
405 : &self,
406 : tenant: &Arc<Tenant>,
407 : ctx: &RequestContext,
408 : cancel: &CancellationToken,
409 : ) {
410 : if self.conf.metric_collection_endpoint.is_none() {
411 : // We don't start the consumption metrics task if this is not set in the config.
412 : // So, no need to imitate the accesses in that case.
413 : return;
414 : }
415 :
416 : // The consumption metrics are collected on a per-tenant basis, by a single
417 : // global background loop.
418 : // It limits the number of synthetic size calculations using the global
419 : // `concurrent_tenant_size_logical_size_queries` semaphore to not overload
420 : // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
421 : //
422 : // If we used that same semaphore here, then we'd compete for the
423 : // same permits, which may impact timeliness of consumption metrics.
424 : // That is a no-go, as consumption metrics are much more important
425 : // than what we do here.
426 : //
427 : // So, we have a separate semaphore, initialized to the same
428 : // number of permits as the `concurrent_tenant_size_logical_size_queries`.
429 : // In the worst, we would have twice the amount of concurrenct size calculations.
430 : // But in practice, the `p.threshold` >> `consumption metric interval`, and
431 : // we spread out the eviction task using `random_init_delay`.
432 : // So, the chance of the worst case is quite low in practice.
433 : // It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
434 : // So, we must coordinate with other with other eviction tasks of this tenant.
435 : let limit = self
436 : .conf
437 : .eviction_task_immitated_concurrent_logical_size_queries
438 : .inner();
439 :
440 : let mut throwaway_cache = HashMap::new();
441 : let gather = crate::tenant::size::gather_inputs(
442 : tenant,
443 : limit,
444 : None,
445 : &mut throwaway_cache,
446 : LogicalSizeCalculationCause::EvictionTaskImitation,
447 : ctx,
448 : )
449 : .instrument(info_span!("gather_inputs"));
450 :
451 12 : tokio::select! {
452 : _ = cancel.cancelled() => {}
453 6 : gather_result = gather => {
454 : match gather_result {
455 : Ok(_) => {},
456 : Err(e) => {
457 : // We don't care about the result, but, if it failed, we should log it,
458 : // since consumption metric might be hitting the cached value and
459 : // thus not encountering this error.
460 0 : warn!("failed to imitate synthetic size calculation accesses: {e:#}")
461 : }
462 : }
463 : }
464 : }
465 : }
466 : }
|