Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::{
5 : HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
6 : };
7 : use pageserver_api::shard::ShardIndex;
8 : use std::ops::Range;
9 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10 : use std::sync::{Arc, Weak};
11 : use std::time::{Duration, SystemTime};
12 : use tracing::Instrument;
13 : use utils::lsn::Lsn;
14 : use utils::sync::heavier_once_cell;
15 :
16 : use crate::config::PageServerConf;
17 : use crate::context::RequestContext;
18 : use crate::repository::Key;
19 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
20 : use crate::tenant::timeline::GetVectoredError;
21 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
22 :
23 : use super::delta_layer::{self, DeltaEntry};
24 : use super::image_layer;
25 : use super::{
26 : AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc,
27 : ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
28 : };
29 :
30 : use utils::generation::Generation;
31 :
32 : #[cfg(test)]
33 : mod tests;
34 :
35 : #[cfg(test)]
36 : mod failpoints;
37 :
38 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
39 : /// range of LSNs.
40 : ///
41 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
42 : /// layers are used to ingest incoming WAL, and provide fast access to the
43 : /// recent page versions. On-disk layers are stored as files on disk, and are
44 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
45 : /// [`InMemoryLayer`].
46 : ///
47 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
48 : /// A delta layer contains all modifications within a range of LSNs and keys.
49 : /// An image layer is a snapshot of all the data in a key-range, at a single
50 : /// LSN.
51 : ///
52 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
53 : /// general goal, read accesses should always win eviction and eviction should not wait for
54 : /// download.
55 : ///
56 : /// ### State transitions
57 : ///
58 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
59 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
60 : /// right size) or deleted.
61 : ///
62 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
63 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
64 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
65 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
66 : /// cancelling the eviction.
67 : ///
68 : /// ```text
69 : /// +-----------------+ get_or_maybe_download +--------------------------------+
70 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
71 : /// | ENOENT | /->| |
72 : /// +-----------------+ | +--------------------------------+
73 : /// ^ | | ^
74 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
75 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
76 : /// | | | | - re-initialize without download
77 : /// | | evict_and_wait | |
78 : /// +-----------------+ v |
79 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
80 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
81 : /// +-----------------+ +--------------------------------------+
82 : /// ```
83 : ///
84 : /// ### Unsupported
85 : ///
86 : /// - Evicting by the operator deleting files from the filesystem
87 : ///
88 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
89 : #[derive(Clone)]
90 : pub(crate) struct Layer(Arc<LayerInner>);
91 :
92 : impl std::fmt::Display for Layer {
93 1390 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 1390 : if matches!(self.0.generation, Generation::Broken) {
95 0 : write!(f, "{}-broken", self.layer_desc().short_id())
96 : } else {
97 1390 : write!(
98 1390 : f,
99 1390 : "{}{}",
100 1390 : self.layer_desc().short_id(),
101 1390 : self.0.generation.get_suffix()
102 1390 : )
103 : }
104 1390 : }
105 : }
106 :
107 : impl std::fmt::Debug for Layer {
108 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 0 : write!(f, "{}", self)
110 0 : }
111 : }
112 :
113 : impl AsLayerDesc for Layer {
114 383450 : fn layer_desc(&self) -> &PersistentLayerDesc {
115 383450 : self.0.layer_desc()
116 383450 : }
117 : }
118 :
119 : impl Layer {
120 : /// Creates a layer value for a file we know to not be resident.
121 0 : pub(crate) fn for_evicted(
122 0 : conf: &'static PageServerConf,
123 0 : timeline: &Arc<Timeline>,
124 0 : file_name: LayerFileName,
125 0 : metadata: LayerFileMetadata,
126 0 : ) -> Self {
127 0 : let desc = PersistentLayerDesc::from_filename(
128 0 : timeline.tenant_shard_id,
129 0 : timeline.timeline_id,
130 0 : file_name,
131 0 : metadata.file_size(),
132 0 : );
133 0 :
134 0 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
135 0 :
136 0 : let owner = Layer(Arc::new(LayerInner::new(
137 0 : conf,
138 0 : timeline,
139 0 : access_stats,
140 0 : desc,
141 0 : None,
142 0 : metadata.generation,
143 0 : metadata.shard,
144 0 : )));
145 0 :
146 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
147 :
148 0 : owner
149 0 : }
150 :
151 : /// Creates a Layer value for a file we know to be resident in timeline directory.
152 24 : pub(crate) fn for_resident(
153 24 : conf: &'static PageServerConf,
154 24 : timeline: &Arc<Timeline>,
155 24 : file_name: LayerFileName,
156 24 : metadata: LayerFileMetadata,
157 24 : ) -> ResidentLayer {
158 24 : let desc = PersistentLayerDesc::from_filename(
159 24 : timeline.tenant_shard_id,
160 24 : timeline.timeline_id,
161 24 : file_name,
162 24 : metadata.file_size(),
163 24 : );
164 24 :
165 24 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
166 24 :
167 24 : let mut resident = None;
168 24 :
169 24 : let owner = Layer(Arc::new_cyclic(|owner| {
170 24 : let inner = Arc::new(DownloadedLayer {
171 24 : owner: owner.clone(),
172 24 : kind: tokio::sync::OnceCell::default(),
173 24 : version: 0,
174 24 : });
175 24 : resident = Some(inner.clone());
176 24 :
177 24 : LayerInner::new(
178 24 : conf,
179 24 : timeline,
180 24 : access_stats,
181 24 : desc,
182 24 : Some(inner),
183 24 : metadata.generation,
184 24 : metadata.shard,
185 24 : )
186 24 : }));
187 24 :
188 24 : let downloaded = resident.expect("just initialized");
189 24 :
190 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
191 :
192 24 : timeline
193 24 : .metrics
194 24 : .resident_physical_size_add(metadata.file_size());
195 24 :
196 24 : ResidentLayer { downloaded, owner }
197 24 : }
198 :
199 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
200 : /// temporary path.
201 942 : pub(crate) fn finish_creating(
202 942 : conf: &'static PageServerConf,
203 942 : timeline: &Arc<Timeline>,
204 942 : desc: PersistentLayerDesc,
205 942 : temp_path: &Utf8Path,
206 942 : ) -> anyhow::Result<ResidentLayer> {
207 942 : let mut resident = None;
208 942 :
209 942 : let owner = Layer(Arc::new_cyclic(|owner| {
210 942 : let inner = Arc::new(DownloadedLayer {
211 942 : owner: owner.clone(),
212 942 : kind: tokio::sync::OnceCell::default(),
213 942 : version: 0,
214 942 : });
215 942 : resident = Some(inner.clone());
216 942 : let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
217 942 : access_stats.record_residence_event(
218 942 : LayerResidenceStatus::Resident,
219 942 : LayerResidenceEventReason::LayerCreate,
220 942 : );
221 942 : LayerInner::new(
222 942 : conf,
223 942 : timeline,
224 942 : access_stats,
225 942 : desc,
226 942 : Some(inner),
227 942 : timeline.generation,
228 942 : timeline.get_shard_index(),
229 942 : )
230 942 : }));
231 942 :
232 942 : let downloaded = resident.expect("just initialized");
233 942 :
234 942 : // if the rename works, the path is as expected
235 942 : // TODO: sync system call
236 942 : std::fs::rename(temp_path, owner.local_path())
237 942 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
238 :
239 942 : Ok(ResidentLayer { downloaded, owner })
240 942 : }
241 :
242 : /// Requests the layer to be evicted and waits for this to be done.
243 : ///
244 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
245 : ///
246 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
247 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
248 : ///
249 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
250 : /// will happen regardless the future returned by this method completing unless there is a
251 : /// read access before eviction gets to complete.
252 : ///
253 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
254 : /// of download-evict cycle on retry.
255 34 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
256 46 : self.0.evict_and_wait(timeout).await
257 32 : }
258 :
259 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
260 : /// then.
261 : ///
262 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
263 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
264 : /// the value this is called on gets dropped.
265 : ///
266 : /// This is ensured by both of those methods accepting references to Layer.
267 : ///
268 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
269 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
270 450 : pub(crate) fn delete_on_drop(&self) {
271 450 : self.0.delete_on_drop();
272 450 : }
273 :
274 : /// Return data needed to reconstruct given page at LSN.
275 : ///
276 : /// It is up to the caller to collect more data from the previous layer and
277 : /// perform WAL redo, if necessary.
278 : ///
279 : /// # Cancellation-Safety
280 : ///
281 : /// This method is cancellation-safe.
282 124542 : pub(crate) async fn get_value_reconstruct_data(
283 124542 : &self,
284 124542 : key: Key,
285 124542 : lsn_range: Range<Lsn>,
286 124542 : reconstruct_data: &mut ValueReconstructState,
287 124542 : ctx: &RequestContext,
288 124542 : ) -> anyhow::Result<ValueReconstructResult> {
289 : use anyhow::ensure;
290 :
291 124542 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
292 124542 : self.0
293 124542 : .access_stats
294 124542 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
295 124542 :
296 124542 : if self.layer_desc().is_delta {
297 124016 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
298 124016 : ensure!(self.layer_desc().key_range.contains(&key));
299 : } else {
300 526 : ensure!(self.layer_desc().key_range.contains(&key));
301 526 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
302 526 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
303 : }
304 :
305 124542 : layer
306 124542 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
307 124542 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
308 23600 : .await
309 124542 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
310 124542 : }
311 :
312 18 : pub(crate) async fn get_values_reconstruct_data(
313 18 : &self,
314 18 : keyspace: KeySpace,
315 18 : lsn_range: Range<Lsn>,
316 18 : reconstruct_data: &mut ValuesReconstructState,
317 18 : ctx: &RequestContext,
318 18 : ) -> Result<(), GetVectoredError> {
319 18 : let layer = self
320 18 : .0
321 18 : .get_or_maybe_download(true, Some(ctx))
322 0 : .await
323 18 : .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
324 :
325 18 : self.0
326 18 : .access_stats
327 18 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
328 18 :
329 18 : layer
330 18 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
331 18 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
332 41 : .await
333 18 : }
334 :
335 : /// Download the layer if evicted.
336 : ///
337 : /// Will not error when the layer is already downloaded.
338 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
339 0 : self.0.get_or_maybe_download(true, None).await?;
340 0 : Ok(())
341 0 : }
342 :
343 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
344 : /// while the guard exists.
345 : ///
346 : /// Returns None if the layer is currently evicted or becoming evicted.
347 : #[cfg(test)]
348 16 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
349 16 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
350 :
351 10 : Some(ResidentLayer {
352 10 : downloaded,
353 10 : owner: self.clone(),
354 10 : })
355 16 : }
356 :
357 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
358 : /// with `EvictionError::NotFound`.
359 : ///
360 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
361 : /// will be unless a read happens soon.
362 34 : pub(crate) fn is_likely_resident(&self) -> bool {
363 34 : self.0
364 34 : .inner
365 34 : .get()
366 34 : .map(|rowe| rowe.is_likely_resident())
367 34 : .unwrap_or(false)
368 34 : }
369 :
370 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
371 444 : pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
372 444 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
373 :
374 444 : Ok(ResidentLayer {
375 444 : downloaded,
376 444 : owner: self.clone(),
377 444 : })
378 444 : }
379 :
380 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
381 0 : self.0.info(reset)
382 0 : }
383 :
384 0 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
385 0 : &self.0.access_stats
386 0 : }
387 :
388 1090 : pub(crate) fn local_path(&self) -> &Utf8Path {
389 1090 : &self.0.path
390 1090 : }
391 :
392 948 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
393 948 : self.0.metadata()
394 948 : }
395 :
396 : /// Traditional debug dumping facility
397 : #[allow(unused)]
398 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
399 4 : self.0.desc.dump();
400 4 :
401 4 : if verbose {
402 : // for now, unconditionally download everything, even if that might not be wanted.
403 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
404 8 : l.dump(&self.0, ctx).await?
405 0 : }
406 :
407 4 : Ok(())
408 4 : }
409 :
410 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
411 : /// deletion scheduling has completed).
412 : ///
413 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
414 : /// separatedly.
415 : #[cfg(any(feature = "testing", test))]
416 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
417 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
418 :
419 2 : async move {
420 : loop {
421 6 : if rx.changed().await.is_err() {
422 2 : break;
423 0 : }
424 : }
425 2 : }
426 2 : }
427 : }
428 :
429 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
430 : ///
431 : /// However when we want something evicted, we cannot evict it right away as there might be current
432 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
433 : /// read with [`Layer::get_value_reconstruct_data`].
434 : ///
435 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
436 : #[derive(Debug)]
437 : enum ResidentOrWantedEvicted {
438 : Resident(Arc<DownloadedLayer>),
439 : WantedEvicted(Weak<DownloadedLayer>, usize),
440 : }
441 :
442 : impl ResidentOrWantedEvicted {
443 : /// Non-mutating access to the a DownloadedLayer, if possible.
444 : ///
445 : /// This is not used on the read path (anything that calls
446 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
447 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
448 : #[cfg(test)]
449 10 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
450 10 : match self {
451 10 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
452 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
453 : }
454 10 : }
455 :
456 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
457 : /// reference from `ResidentOrWantedEvicted::get`.
458 28 : fn is_likely_resident(&self) -> bool {
459 28 : match self {
460 22 : ResidentOrWantedEvicted::Resident(_) => true,
461 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
462 : }
463 28 : }
464 :
465 : /// Upgrades any weak to strong if possible.
466 : ///
467 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
468 : /// happened.
469 125012 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
470 125012 : match self {
471 125004 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
472 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
473 0 : Some(strong) => {
474 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
475 0 :
476 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
477 0 :
478 0 : Some((strong, true))
479 : }
480 8 : None => None,
481 : },
482 : }
483 125012 : }
484 :
485 : /// When eviction is first requested, drop down to holding a [`Weak`].
486 : ///
487 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
488 : /// drop the possibly last strong reference outside of the mutex of
489 : /// [`heavier_once_cell::OnceCell`].
490 28 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
491 28 : match self {
492 24 : ResidentOrWantedEvicted::Resident(strong) => {
493 24 : let weak = Arc::downgrade(strong);
494 24 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
495 24 : std::mem::swap(self, &mut temp);
496 24 : match temp {
497 24 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
498 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
499 : }
500 : }
501 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
502 : }
503 28 : }
504 : }
505 :
506 : struct LayerInner {
507 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
508 : /// [`Self::path`].
509 : conf: &'static PageServerConf,
510 :
511 : /// Full path to the file; unclear if this should exist anymore.
512 : path: Utf8PathBuf,
513 :
514 : desc: PersistentLayerDesc,
515 :
516 : /// Timeline access is needed for remote timeline client and metrics.
517 : ///
518 : /// There should not be an access to timeline for any reason without entering the
519 : /// [`Timeline::gate`] at the same time.
520 : timeline: Weak<Timeline>,
521 :
522 : /// Cached knowledge of [`Timeline::remote_client`] being `Some`.
523 : have_remote_client: bool,
524 :
525 : access_stats: LayerAccessStats,
526 :
527 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
528 : ///
529 : /// Filesystem changes (download, evict) are only done while holding a permit which the
530 : /// `heavier_once_cell` provides.
531 : ///
532 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
533 : /// possibly read while not holding it.
534 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
535 :
536 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
537 : wanted_deleted: AtomicBool,
538 :
539 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
540 : ///
541 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
542 : /// `ResidentOrWantedEvicted::WantedEvicted`.
543 : version: AtomicUsize,
544 :
545 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
546 : /// starts, or completes.
547 : ///
548 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
549 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
550 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
551 : /// [`ResidentOrWantedEvicted::Resident`] on access.
552 : ///
553 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
554 : status: Option<tokio::sync::watch::Sender<Status>>,
555 :
556 : /// Counter for exponential backoff with the download.
557 : ///
558 : /// This is atomic only for the purposes of having additional data only accessed while holding
559 : /// the InitPermit.
560 : consecutive_failures: AtomicUsize,
561 :
562 : /// The generation of this Layer.
563 : ///
564 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
565 : /// for created layers from [`Timeline::generation`].
566 : generation: Generation,
567 :
568 : /// The shard of this Layer.
569 : ///
570 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
571 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
572 : ///
573 : /// For loaded layers, this may be some other value if the tenant has undergone
574 : /// a shard split since the layer was originally written.
575 : shard: ShardIndex,
576 :
577 : /// When the Layer was last evicted but has not been downloaded since.
578 : ///
579 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
580 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
581 :
582 : #[cfg(test)]
583 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
584 : }
585 :
586 : impl std::fmt::Display for LayerInner {
587 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 30 : write!(f, "{}", self.layer_desc().short_id())
589 30 : }
590 : }
591 :
592 : impl AsLayerDesc for LayerInner {
593 385710 : fn layer_desc(&self) -> &PersistentLayerDesc {
594 385710 : &self.desc
595 385710 : }
596 : }
597 :
598 : #[derive(Debug, Clone, Copy)]
599 : enum Status {
600 : Resident,
601 : Evicted,
602 : Downloading,
603 : }
604 :
605 : impl Drop for LayerInner {
606 472 : fn drop(&mut self) {
607 472 : if !*self.wanted_deleted.get_mut() {
608 : // should we try to evict if the last wish was for eviction? seems more like a hazard
609 : // than a clear win.
610 26 : return;
611 446 : }
612 :
613 446 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
614 :
615 446 : let path = std::mem::take(&mut self.path);
616 446 : let file_name = self.layer_desc().filename();
617 446 : let file_size = self.layer_desc().file_size;
618 446 : let timeline = self.timeline.clone();
619 446 : let meta = self.metadata();
620 446 : let status = self.status.take();
621 446 :
622 446 : Self::spawn_blocking(move || {
623 446 : let _g = span.entered();
624 446 :
625 446 : // carry this until we are finished for [`Layer::wait_drop`] support
626 446 : let _status = status;
627 :
628 446 : let Some(timeline) = timeline.upgrade() else {
629 : // no need to nag that timeline is gone: under normal situation on
630 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
631 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
632 0 : return;
633 : };
634 :
635 446 : let Ok(_guard) = timeline.gate.enter() else {
636 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
637 0 : return;
638 : };
639 :
640 446 : let removed = match std::fs::remove_file(path) {
641 444 : Ok(()) => true,
642 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
643 2 : // until we no longer do detaches by removing all local files before removing the
644 2 : // tenant from the global map, we will always get these errors even if we knew what
645 2 : // is the latest state.
646 2 : //
647 2 : // we currently do not track the latest state, so we'll also end up here on evicted
648 2 : // layers.
649 2 : false
650 : }
651 0 : Err(e) => {
652 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
653 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
654 0 : false
655 : }
656 : };
657 :
658 446 : if removed {
659 444 : timeline.metrics.resident_physical_size_sub(file_size);
660 444 : }
661 446 : if let Some(remote_client) = timeline.remote_client.as_ref() {
662 446 : let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
663 :
664 446 : if let Err(e) = res {
665 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
666 : // demonstrating this deadlock (without spawn_blocking): stop will drop
667 : // queued items, which will have ResidentLayer's, and those drops would try
668 : // to re-entrantly lock the RemoteTimelineClient inner state.
669 0 : if !timeline.is_active() {
670 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
671 : } else {
672 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
673 : }
674 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
675 446 : } else {
676 446 : LAYER_IMPL_METRICS.inc_completed_deletes();
677 446 : }
678 0 : }
679 446 : });
680 472 : }
681 : }
682 :
683 : impl LayerInner {
684 966 : fn new(
685 966 : conf: &'static PageServerConf,
686 966 : timeline: &Arc<Timeline>,
687 966 : access_stats: LayerAccessStats,
688 966 : desc: PersistentLayerDesc,
689 966 : downloaded: Option<Arc<DownloadedLayer>>,
690 966 : generation: Generation,
691 966 : shard: ShardIndex,
692 966 : ) -> Self {
693 966 : let path = conf
694 966 : .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
695 966 : .join(desc.filename().to_string());
696 :
697 966 : let (inner, version, init_status) = if let Some(inner) = downloaded {
698 966 : let version = inner.version;
699 966 : let resident = ResidentOrWantedEvicted::Resident(inner);
700 966 : (
701 966 : heavier_once_cell::OnceCell::new(resident),
702 966 : version,
703 966 : Status::Resident,
704 966 : )
705 : } else {
706 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
707 : };
708 :
709 966 : LayerInner {
710 966 : conf,
711 966 : path,
712 966 : desc,
713 966 : timeline: Arc::downgrade(timeline),
714 966 : have_remote_client: timeline.remote_client.is_some(),
715 966 : access_stats,
716 966 : wanted_deleted: AtomicBool::new(false),
717 966 : inner,
718 966 : version: AtomicUsize::new(version),
719 966 : status: Some(tokio::sync::watch::channel(init_status).0),
720 966 : consecutive_failures: AtomicUsize::new(0),
721 966 : generation,
722 966 : shard,
723 966 : last_evicted_at: std::sync::Mutex::default(),
724 966 : #[cfg(test)]
725 966 : failpoints: Default::default(),
726 966 : }
727 966 : }
728 :
729 450 : fn delete_on_drop(&self) {
730 450 : let res =
731 450 : self.wanted_deleted
732 450 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
733 450 :
734 450 : if res.is_ok() {
735 446 : LAYER_IMPL_METRICS.inc_started_deletes();
736 446 : }
737 450 : }
738 :
739 : /// Cancellation safe, however dropping the future and calling this method again might result
740 : /// in a new attempt to evict OR join the previously started attempt.
741 136 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
742 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
743 : assert!(self.have_remote_client);
744 :
745 : let mut rx = self.status.as_ref().unwrap().subscribe();
746 :
747 : {
748 : let current = rx.borrow_and_update();
749 : match &*current {
750 : Status::Resident => {
751 : // we might get lucky and evict this; continue
752 : }
753 : Status::Evicted | Status::Downloading => {
754 : // it is already evicted
755 : return Err(EvictionError::NotFound);
756 : }
757 : }
758 : }
759 :
760 : let strong = {
761 : match self.inner.get() {
762 : Some(mut either) => either.downgrade(),
763 : None => {
764 : // we already have a scheduled eviction, which just has not gotten to run yet.
765 : // it might still race with a read access, but that could also get cancelled,
766 : // so let's say this is not evictable.
767 : return Err(EvictionError::NotFound);
768 : }
769 : }
770 : };
771 :
772 : if strong.is_some() {
773 : // drop the DownloadedLayer outside of the holding the guard
774 : drop(strong);
775 :
776 : // idea here is that only one evicter should ever get to witness a strong reference,
777 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
778 : // cancelled eviction and signal us, like it currently does.
779 : //
780 : // a second concurrent evict_and_wait will not see a strong reference.
781 : LAYER_IMPL_METRICS.inc_started_evictions();
782 : }
783 :
784 : let changed = rx.changed();
785 : let changed = tokio::time::timeout(timeout, changed).await;
786 :
787 : let Ok(changed) = changed else {
788 : return Err(EvictionError::Timeout);
789 : };
790 :
791 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
792 :
793 : let current = rx.borrow_and_update();
794 :
795 : match &*current {
796 : // the easiest case
797 : Status::Evicted => Ok(()),
798 : // it surely was evicted in between, but then there was a new access now; we can't know
799 : // if it'll succeed so lets just call it evicted
800 : Status::Downloading => Ok(()),
801 : // either the download which was started after eviction completed already, or it was
802 : // never evicted
803 : Status::Resident => Err(EvictionError::Downloaded),
804 : }
805 : }
806 :
807 : /// Cancellation safe.
808 125020 : async fn get_or_maybe_download(
809 125020 : self: &Arc<Self>,
810 125020 : allow_download: bool,
811 125020 : ctx: Option<&RequestContext>,
812 125020 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
813 16 : let (weak, permit) = {
814 : // get_or_init_detached can:
815 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
816 : // - be slow (wait for semaphore permit or closing)
817 125020 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
818 :
819 125020 : let locked = self
820 125020 : .inner
821 125020 : .get_or_init_detached()
822 3 : .await
823 125020 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
824 125020 :
825 125020 : scopeguard::ScopeGuard::into_inner(init_cancelled);
826 :
827 125004 : match locked {
828 : // this path could had been a RwLock::read
829 125004 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
830 0 : Ok(Ok((strong, _))) => {
831 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
832 0 : // previously a `evict_and_wait` was received. this is the only place when we
833 0 : // send out an update without holding the InitPermit.
834 0 : //
835 0 : // note that we also have dropped the Guard; this is fine, because we just made
836 0 : // a state change and are holding a strong reference to be returned.
837 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
838 0 : LAYER_IMPL_METRICS
839 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
840 0 :
841 0 : return Ok(strong);
842 : }
843 8 : Ok(Err(guard)) => {
844 8 : // path to here: we won the eviction, the file should still be on the disk.
845 8 : let (weak, permit) = guard.take_and_deinit();
846 8 : (Some(weak), permit)
847 : }
848 8 : Err(permit) => (None, permit),
849 : }
850 : };
851 :
852 16 : if let Some(weak) = weak {
853 : // only drop the weak after dropping the heavier_once_cell guard
854 8 : assert!(
855 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
856 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
857 : );
858 8 : }
859 :
860 16 : let timeline = self
861 16 : .timeline
862 16 : .upgrade()
863 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
864 :
865 : // count cancellations, which currently remain largely unexpected
866 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
867 :
868 : // check if we really need to be downloaded: this can happen if a read access won the
869 : // semaphore before eviction.
870 : //
871 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
872 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
873 16 : let needs_download = self
874 16 : .needs_download()
875 16 : .await
876 16 : .map_err(DownloadError::PreStatFailed);
877 16 :
878 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
879 :
880 16 : let needs_download = needs_download?;
881 :
882 16 : let Some(reason) = needs_download else {
883 : // the file is present locally because eviction has not had a chance to run yet
884 :
885 : #[cfg(test)]
886 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
887 2 : .await?;
888 :
889 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
890 6 :
891 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
892 : };
893 :
894 : // we must download; getting cancelled before spawning the download is not an issue as
895 : // any still running eviction would not find anything to evict.
896 :
897 8 : if let NeedsDownload::NotFile(ft) = reason {
898 0 : return Err(DownloadError::NotFile(ft));
899 8 : }
900 8 :
901 8 : if timeline.remote_client.as_ref().is_none() {
902 0 : return Err(DownloadError::NoRemoteStorage);
903 8 : }
904 :
905 8 : if let Some(ctx) = ctx {
906 2 : self.check_expected_download(ctx)?;
907 6 : }
908 :
909 8 : if !allow_download {
910 : // this is only used from tests, but it is hard to test without the boolean
911 2 : return Err(DownloadError::DownloadRequired);
912 6 : }
913 :
914 6 : async move {
915 6 : tracing::info!(%reason, "downloading on-demand");
916 :
917 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
918 10 : let res = self.download_init_and_wait(timeline, permit).await?;
919 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
920 6 : Ok(res)
921 6 : }
922 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
923 10 : .await
924 125020 : }
925 :
926 : /// Nag or fail per RequestContext policy
927 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
928 2 : use crate::context::DownloadBehavior::*;
929 2 : let b = ctx.download_behavior();
930 2 : match b {
931 2 : Download => Ok(()),
932 : Warn | Error => {
933 0 : tracing::info!(
934 0 : "unexpectedly on-demand downloading for task kind {:?}",
935 0 : ctx.task_kind()
936 0 : );
937 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
938 :
939 0 : let really_error =
940 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
941 :
942 0 : if really_error {
943 : // this check is only probablistic, seems like flakyness footgun
944 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
945 : } else {
946 0 : Ok(())
947 : }
948 : }
949 : }
950 2 : }
951 :
952 : /// Actual download, at most one is executed at the time.
953 6 : async fn download_init_and_wait(
954 6 : self: &Arc<Self>,
955 6 : timeline: Arc<Timeline>,
956 6 : permit: heavier_once_cell::InitPermit,
957 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
958 6 : debug_assert_current_span_has_tenant_and_timeline_id();
959 6 :
960 6 : let (tx, rx) = tokio::sync::oneshot::channel();
961 6 :
962 6 : let this: Arc<Self> = self.clone();
963 :
964 6 : let guard = timeline
965 6 : .gate
966 6 : .enter()
967 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
968 :
969 6 : Self::spawn(
970 6 : async move {
971 0 : let _guard = guard;
972 0 :
973 0 : // now that we have commited to downloading, send out an update to:
974 0 : // - unhang any pending eviction
975 0 : // - break out of evict_and_wait
976 0 : this.status
977 0 : .as_ref()
978 0 : .unwrap()
979 0 : .send_replace(Status::Downloading);
980 6 :
981 6 : #[cfg(test)]
982 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
983 2 : .await
984 6 : .unwrap();
985 :
986 93 : let res = this.download_and_init(timeline, permit).await;
987 :
988 6 : if let Err(res) = tx.send(res) {
989 0 : match res {
990 0 : Ok(_res) => {
991 0 : tracing::debug!("layer initialized, but caller has been cancelled");
992 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
993 : }
994 0 : Err(e) => {
995 0 : tracing::info!(
996 0 : "layer file download failed, and caller has been cancelled: {e:?}"
997 0 : );
998 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
999 : }
1000 : }
1001 6 : }
1002 6 : }
1003 6 : .in_current_span(),
1004 6 : );
1005 6 :
1006 10 : match rx.await {
1007 6 : Ok(Ok(res)) => Ok(res),
1008 0 : Ok(Err(e)) => {
1009 0 : // sleep already happened in the spawned task, if it was not cancelled
1010 0 : match e.downcast_ref::<remote_storage::DownloadError>() {
1011 : // If the download failed due to its cancellation token,
1012 : // propagate the cancellation error upstream.
1013 : Some(remote_storage::DownloadError::Cancelled) => {
1014 0 : Err(DownloadError::DownloadCancelled)
1015 : }
1016 : // FIXME: this is not embedding the error because historically it would had
1017 : // been output to compute, however that is no longer the case.
1018 0 : _ => Err(DownloadError::DownloadFailed),
1019 : }
1020 : }
1021 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1022 : }
1023 6 : }
1024 :
1025 6 : async fn download_and_init(
1026 6 : self: &Arc<LayerInner>,
1027 6 : timeline: Arc<Timeline>,
1028 6 : permit: heavier_once_cell::InitPermit,
1029 6 : ) -> anyhow::Result<Arc<DownloadedLayer>> {
1030 6 : let client = timeline
1031 6 : .remote_client
1032 6 : .as_ref()
1033 6 : .expect("checked before download_init_and_wait");
1034 :
1035 6 : let result = client
1036 6 : .download_layer_file(&self.desc.filename(), &self.metadata(), &timeline.cancel)
1037 87 : .await;
1038 :
1039 6 : match result {
1040 6 : Ok(size) => {
1041 6 : assert_eq!(size, self.desc.file_size);
1042 :
1043 6 : match self.needs_download().await {
1044 0 : Ok(Some(reason)) => {
1045 0 : // this is really a bug in needs_download or remote timeline client
1046 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1047 : }
1048 6 : Ok(None) => {
1049 6 : // as expected
1050 6 : }
1051 0 : Err(e) => {
1052 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1053 : }
1054 : }
1055 :
1056 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1057 6 : timeline
1058 6 : .metrics
1059 6 : .resident_physical_size_add(self.desc.file_size);
1060 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1061 6 :
1062 6 : let since_last_eviction = self
1063 6 : .last_evicted_at
1064 6 : .lock()
1065 6 : .unwrap()
1066 6 : .take()
1067 6 : .map(|ts| ts.elapsed());
1068 6 : if let Some(since_last_eviction) = since_last_eviction {
1069 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1070 6 : }
1071 :
1072 6 : self.access_stats.record_residence_event(
1073 6 : LayerResidenceStatus::Resident,
1074 6 : LayerResidenceEventReason::ResidenceChange,
1075 6 : );
1076 6 :
1077 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1078 : }
1079 0 : Err(e) => {
1080 0 : let consecutive_failures =
1081 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1082 0 :
1083 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1084 :
1085 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1086 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1087 0 : 1.5,
1088 0 : 60.0,
1089 0 : );
1090 0 :
1091 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1092 :
1093 0 : tokio::select! {
1094 0 : _ = tokio::time::sleep(backoff) => {},
1095 0 : _ = timeline.cancel.cancelled() => {},
1096 0 : };
1097 :
1098 0 : Err(e)
1099 : }
1100 : }
1101 6 : }
1102 :
1103 : /// Initializes the `Self::inner` to a "resident" state.
1104 : ///
1105 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1106 : /// before calling this method.
1107 : ///
1108 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1109 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1110 12 : fn initialize_after_layer_is_on_disk(
1111 12 : self: &Arc<LayerInner>,
1112 12 : permit: heavier_once_cell::InitPermit,
1113 12 : ) -> Arc<DownloadedLayer> {
1114 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1115 12 :
1116 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1117 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1118 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1119 12 :
1120 12 : let res = Arc::new(DownloadedLayer {
1121 12 : owner: Arc::downgrade(self),
1122 12 : kind: tokio::sync::OnceCell::default(),
1123 12 : version: next_version,
1124 12 : });
1125 12 :
1126 12 : let waiters = self.inner.initializer_count();
1127 12 : if waiters > 0 {
1128 0 : tracing::info!(waiters, "completing layer init for other tasks");
1129 12 : }
1130 :
1131 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1132 12 :
1133 12 : self.inner.set(value, permit);
1134 12 :
1135 12 : res
1136 12 : }
1137 :
1138 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1139 24 : match tokio::fs::metadata(&self.path).await {
1140 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1141 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1142 0 : Err(e) => Err(e),
1143 : }
1144 24 : }
1145 :
1146 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1147 24 : match self.path.metadata() {
1148 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1149 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1150 0 : Err(e) => Err(e),
1151 : }
1152 24 : }
1153 :
1154 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1155 40 : // in future, this should include sha2-256 validation of the file.
1156 40 : if !m.is_file() {
1157 0 : Err(NeedsDownload::NotFile(m.file_type()))
1158 40 : } else if m.len() != self.desc.file_size {
1159 0 : Err(NeedsDownload::WrongSize {
1160 0 : actual: m.len(),
1161 0 : expected: self.desc.file_size,
1162 0 : })
1163 : } else {
1164 40 : Ok(())
1165 : }
1166 40 : }
1167 :
1168 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1169 0 : let layer_file_name = self.desc.filename().file_name();
1170 0 :
1171 0 : let resident = self
1172 0 : .inner
1173 0 : .get()
1174 0 : .map(|rowe| rowe.is_likely_resident())
1175 0 : .unwrap_or(false);
1176 0 :
1177 0 : let access_stats = self.access_stats.as_api_model(reset);
1178 0 :
1179 0 : if self.desc.is_delta {
1180 0 : let lsn_range = &self.desc.lsn_range;
1181 0 :
1182 0 : HistoricLayerInfo::Delta {
1183 0 : layer_file_name,
1184 0 : layer_file_size: self.desc.file_size,
1185 0 : lsn_start: lsn_range.start,
1186 0 : lsn_end: lsn_range.end,
1187 0 : remote: !resident,
1188 0 : access_stats,
1189 0 : }
1190 : } else {
1191 0 : let lsn = self.desc.image_layer_lsn();
1192 0 :
1193 0 : HistoricLayerInfo::Image {
1194 0 : layer_file_name,
1195 0 : layer_file_size: self.desc.file_size,
1196 0 : lsn_start: lsn,
1197 0 : remote: !resident,
1198 0 : access_stats,
1199 0 : }
1200 : }
1201 0 : }
1202 :
1203 : /// `DownloadedLayer` is being dropped, so it calls this method.
1204 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1205 24 : let can_evict = self.have_remote_client;
1206 :
1207 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1208 : // though here it is very likely
1209 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1210 :
1211 24 : if !can_evict {
1212 : // it would be nice to assert this case out, but we are in drop
1213 0 : span.in_scope(|| {
1214 0 : tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
1215 0 : });
1216 0 : return;
1217 24 : }
1218 24 :
1219 24 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1220 24 : // drop while the `self.inner` is being locked, leading to a deadlock.
1221 24 :
1222 24 : let start_evicting = async move {
1223 24 : #[cfg(test)]
1224 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1225 14 : .await
1226 24 : .expect("failpoint should not have errored");
1227 24 :
1228 24 : tracing::debug!("eviction started");
1229 :
1230 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1231 : // metrics: ignore the Ok branch, it is not done yet
1232 24 : if let Err(e) = res {
1233 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1234 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1235 18 : }
1236 24 : };
1237 :
1238 24 : Self::spawn(start_evicting.instrument(span));
1239 24 : }
1240 :
1241 24 : async fn wait_for_turn_and_evict(
1242 24 : self: Arc<LayerInner>,
1243 24 : only_version: usize,
1244 24 : ) -> Result<(), EvictionCancelled> {
1245 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1246 46 : use Status::*;
1247 46 : match status {
1248 44 : Resident => Ok(()),
1249 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1250 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1251 : }
1252 46 : }
1253 :
1254 24 : let timeline = self
1255 24 : .timeline
1256 24 : .upgrade()
1257 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1258 :
1259 24 : let mut rx = self
1260 24 : .status
1261 24 : .as_ref()
1262 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1263 24 : .subscribe();
1264 24 :
1265 24 : is_good_to_continue(&rx.borrow_and_update())?;
1266 :
1267 22 : let Ok(_gate) = timeline.gate.enter() else {
1268 0 : return Err(EvictionCancelled::TimelineGone);
1269 : };
1270 :
1271 18 : let permit = {
1272 : // we cannot just `std::fs::remove_file` because there might already be an
1273 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1274 : // operations must be done while holding the heavier_once_cell::InitPermit
1275 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1276 :
1277 22 : let waited = loop {
1278 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1279 22 : // completion of the download. waiting for download could be long and hinder our
1280 22 : // efforts to alert on "hanging" evictions.
1281 22 : tokio::select! {
1282 22 : res = &mut wait => break res,
1283 : _ = rx.changed() => {
1284 : is_good_to_continue(&rx.borrow_and_update())?;
1285 : // two possibilities for Status::Resident:
1286 : // - the layer was found locally from disk by a read
1287 : // - we missed a bunch of updates and now the layer is
1288 : // again downloaded -- assume we'll fail later on with
1289 : // version check or AlreadyReinitialized
1290 : }
1291 22 : }
1292 22 : };
1293 :
1294 : // re-check now that we have the guard or permit; all updates should have happened
1295 : // while holding the permit.
1296 22 : is_good_to_continue(&rx.borrow_and_update())?;
1297 :
1298 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1299 : // lead to deallocating the reference counted value, and the value we
1300 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1301 22 : let (_weak, permit) = match waited {
1302 20 : Ok(guard) => {
1303 20 : match &*guard {
1304 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1305 18 : if *version == only_version =>
1306 16 : {
1307 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1308 16 : let (weak, permit) = guard.take_and_deinit();
1309 16 : (Some(weak), permit)
1310 : }
1311 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1312 2 : // if we were not doing the version check, we would need to try to
1313 2 : // upgrade the weak here to see if it really is dropped. version check
1314 2 : // is done instead assuming that it is cheaper.
1315 2 : tracing::debug!(
1316 0 : version,
1317 0 : only_version,
1318 0 : "version mismatch, not deinitializing"
1319 0 : );
1320 2 : return Err(EvictionCancelled::VersionCheckFailed);
1321 : }
1322 : ResidentOrWantedEvicted::Resident(_) => {
1323 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1324 : }
1325 : }
1326 : }
1327 2 : Err(permit) => {
1328 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1329 2 : (None, permit)
1330 : }
1331 : };
1332 :
1333 18 : permit
1334 18 : };
1335 18 :
1336 18 : let span = tracing::Span::current();
1337 18 :
1338 18 : let spawned_at = std::time::Instant::now();
1339 18 :
1340 18 : // this is on purpose a detached spawn; we don't need to wait for it
1341 18 : //
1342 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1343 18 : // well from a spawn_blocking thread.
1344 18 : //
1345 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1346 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1347 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1348 18 : // evicting.
1349 18 : //
1350 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1351 18 : // reads. We will need to add cancellation for that if necessary.
1352 18 : Self::spawn_blocking(move || {
1353 18 : let _span = span.entered();
1354 18 :
1355 18 : let res = self.evict_blocking(&timeline, &permit);
1356 18 :
1357 18 : let waiters = self.inner.initializer_count();
1358 18 :
1359 18 : if waiters > 0 {
1360 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1361 18 : }
1362 :
1363 18 : let completed_in = spawned_at.elapsed();
1364 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1365 18 :
1366 18 : match res {
1367 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1368 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1369 : }
1370 :
1371 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1372 18 : });
1373 18 :
1374 18 : Ok(())
1375 24 : }
1376 :
1377 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1378 18 : fn evict_blocking(
1379 18 : &self,
1380 18 : timeline: &Timeline,
1381 18 : _permit: &heavier_once_cell::InitPermit,
1382 18 : ) -> Result<(), EvictionCancelled> {
1383 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1384 18 :
1385 18 : match capture_mtime_and_remove(&self.path) {
1386 18 : Ok(local_layer_mtime) => {
1387 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1388 18 : match duration {
1389 18 : Ok(elapsed) => {
1390 18 : timeline
1391 18 : .metrics
1392 18 : .evictions_with_low_residence_duration
1393 18 : .read()
1394 18 : .unwrap()
1395 18 : .observe(elapsed);
1396 18 : tracing::info!(
1397 18 : residence_millis = elapsed.as_millis(),
1398 18 : "evicted layer after known residence period"
1399 18 : );
1400 : }
1401 : Err(_) => {
1402 0 : tracing::info!("evicted layer after unknown residence period");
1403 : }
1404 : }
1405 18 : timeline.metrics.evictions.inc();
1406 18 : timeline
1407 18 : .metrics
1408 18 : .resident_physical_size_sub(self.desc.file_size);
1409 : }
1410 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1411 0 : tracing::error!(
1412 0 : layer_size = %self.desc.file_size,
1413 0 : "failed to evict layer from disk, it was already gone"
1414 0 : );
1415 0 : return Err(EvictionCancelled::FileNotFound);
1416 : }
1417 0 : Err(e) => {
1418 0 : // FIXME: this should probably be an abort
1419 0 : tracing::error!("failed to evict file from disk: {e:#}");
1420 0 : return Err(EvictionCancelled::RemoveFailed);
1421 : }
1422 : }
1423 :
1424 18 : self.access_stats.record_residence_event(
1425 18 : LayerResidenceStatus::Evicted,
1426 18 : LayerResidenceEventReason::ResidenceChange,
1427 18 : );
1428 18 :
1429 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1430 18 :
1431 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1432 18 :
1433 18 : Ok(())
1434 18 : }
1435 :
1436 1400 : fn metadata(&self) -> LayerFileMetadata {
1437 1400 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1438 1400 : }
1439 :
1440 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1441 : ///
1442 : /// Synchronizing with spawned tasks is very complicated otherwise.
1443 30 : fn spawn<F>(fut: F)
1444 30 : where
1445 30 : F: std::future::Future<Output = ()> + Send + 'static,
1446 30 : {
1447 30 : #[cfg(test)]
1448 30 : tokio::task::spawn(fut);
1449 30 : #[cfg(not(test))]
1450 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1451 30 : }
1452 :
1453 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1454 464 : fn spawn_blocking<F>(f: F)
1455 464 : where
1456 464 : F: FnOnce() + Send + 'static,
1457 464 : {
1458 464 : #[cfg(test)]
1459 464 : tokio::task::spawn_blocking(f);
1460 464 : #[cfg(not(test))]
1461 464 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1462 464 : }
1463 : }
1464 :
1465 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1466 18 : let m = path.metadata()?;
1467 18 : let local_layer_mtime = m.modified()?;
1468 18 : std::fs::remove_file(path)?;
1469 18 : Ok(local_layer_mtime)
1470 18 : }
1471 :
1472 0 : #[derive(Debug, thiserror::Error)]
1473 : pub(crate) enum EvictionError {
1474 : #[error("layer was already evicted")]
1475 : NotFound,
1476 :
1477 : /// Evictions must always lose to downloads in races, and this time it happened.
1478 : #[error("layer was downloaded instead")]
1479 : Downloaded,
1480 :
1481 : #[error("eviction did not happen within timeout")]
1482 : Timeout,
1483 : }
1484 :
1485 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1486 0 : #[derive(Debug, thiserror::Error)]
1487 : pub(crate) enum DownloadError {
1488 : #[error("timeline has already shutdown")]
1489 : TimelineShutdown,
1490 : #[error("no remote storage configured")]
1491 : NoRemoteStorage,
1492 : #[error("context denies downloading")]
1493 : ContextAndConfigReallyDeniesDownloads,
1494 : #[error("downloading is really required but not allowed by this method")]
1495 : DownloadRequired,
1496 : #[error("layer path exists, but it is not a file: {0:?}")]
1497 : NotFile(std::fs::FileType),
1498 : /// Why no error here? Because it will be reported by page_service. We should had also done
1499 : /// retries already.
1500 : #[error("downloading evicted layer file failed")]
1501 : DownloadFailed,
1502 : #[error("downloading failed, possibly for shutdown")]
1503 : DownloadCancelled,
1504 : #[error("pre-condition: stat before download failed")]
1505 : PreStatFailed(#[source] std::io::Error),
1506 :
1507 : #[cfg(test)]
1508 : #[error("failpoint: {0:?}")]
1509 : Failpoint(failpoints::FailpointKind),
1510 : }
1511 :
1512 : #[derive(Debug, PartialEq)]
1513 : pub(crate) enum NeedsDownload {
1514 : NotFound,
1515 : NotFile(std::fs::FileType),
1516 : WrongSize { actual: u64, expected: u64 },
1517 : }
1518 :
1519 : impl std::fmt::Display for NeedsDownload {
1520 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1521 6 : match self {
1522 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1523 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1524 0 : NeedsDownload::WrongSize { actual, expected } => {
1525 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1526 : }
1527 : }
1528 6 : }
1529 : }
1530 :
1531 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1532 : pub(crate) struct DownloadedLayer {
1533 : owner: Weak<LayerInner>,
1534 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1535 : // DownloadedLayer
1536 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1537 : version: usize,
1538 : }
1539 :
1540 : impl std::fmt::Debug for DownloadedLayer {
1541 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1542 0 : f.debug_struct("DownloadedLayer")
1543 0 : // owner omitted because it is always "Weak"
1544 0 : .field("kind", &self.kind)
1545 0 : .field("version", &self.version)
1546 0 : .finish()
1547 0 : }
1548 : }
1549 :
1550 : impl Drop for DownloadedLayer {
1551 494 : fn drop(&mut self) {
1552 494 : if let Some(owner) = self.owner.upgrade() {
1553 24 : owner.on_downloaded_layer_drop(self.version);
1554 470 : } else {
1555 470 : // no need to do anything, we are shutting down
1556 470 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
1557 470 : }
1558 494 : }
1559 : }
1560 :
1561 : impl DownloadedLayer {
1562 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
1563 : /// initialize it permanently.
1564 : ///
1565 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1566 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1567 : /// we will always have the LayerInner on the callstack, so we can just use it.
1568 125008 : async fn get<'a>(
1569 125008 : &'a self,
1570 125008 : owner: &Arc<LayerInner>,
1571 125008 : ctx: &RequestContext,
1572 125008 : ) -> anyhow::Result<&'a LayerKind> {
1573 125008 : let init = || async {
1574 618 : assert_eq!(
1575 618 : Weak::as_ptr(&self.owner),
1576 618 : Arc::as_ptr(owner),
1577 0 : "these are the same, just avoiding the upgrade"
1578 : );
1579 :
1580 618 : let res = if owner.desc.is_delta {
1581 594 : let summary = Some(delta_layer::Summary::expected(
1582 594 : owner.desc.tenant_shard_id.tenant_id,
1583 594 : owner.desc.timeline_id,
1584 594 : owner.desc.key_range.clone(),
1585 594 : owner.desc.lsn_range.clone(),
1586 594 : ));
1587 594 : delta_layer::DeltaLayerInner::load(
1588 594 : &owner.path,
1589 594 : summary,
1590 594 : Some(owner.conf.max_vectored_read_bytes),
1591 594 : ctx,
1592 594 : )
1593 596 : .await
1594 594 : .map(|res| res.map(LayerKind::Delta))
1595 : } else {
1596 24 : let lsn = owner.desc.image_layer_lsn();
1597 24 : let summary = Some(image_layer::Summary::expected(
1598 24 : owner.desc.tenant_shard_id.tenant_id,
1599 24 : owner.desc.timeline_id,
1600 24 : owner.desc.key_range.clone(),
1601 24 : lsn,
1602 24 : ));
1603 24 : image_layer::ImageLayerInner::load(
1604 24 : &owner.path,
1605 24 : lsn,
1606 24 : summary,
1607 24 : Some(owner.conf.max_vectored_read_bytes),
1608 24 : ctx,
1609 24 : )
1610 24 : .await
1611 24 : .map(|res| res.map(LayerKind::Image))
1612 : };
1613 :
1614 618 : match res {
1615 618 : Ok(Ok(layer)) => Ok(Ok(layer)),
1616 0 : Ok(Err(transient)) => Err(transient),
1617 0 : Err(permanent) => {
1618 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1619 0 : // TODO(#5815): we are not logging all errors, so temporarily log them **once**
1620 0 : // here as well
1621 0 : let permanent = permanent.context("load layer");
1622 0 : tracing::error!("layer loading failed permanently: {permanent:#}");
1623 0 : Ok(Err(permanent))
1624 : }
1625 : }
1626 1236 : };
1627 125008 : self.kind
1628 125008 : .get_or_try_init(init)
1629 : // return transient errors using `?`
1630 620 : .await?
1631 125008 : .as_ref()
1632 125008 : .map_err(|e| {
1633 0 : // errors are not clonabled, cannot but stringify
1634 0 : // test_broken_timeline matches this string
1635 0 : anyhow::anyhow!("layer loading failed: {e:#}")
1636 125008 : })
1637 125008 : }
1638 :
1639 124542 : async fn get_value_reconstruct_data(
1640 124542 : &self,
1641 124542 : key: Key,
1642 124542 : lsn_range: Range<Lsn>,
1643 124542 : reconstruct_data: &mut ValueReconstructState,
1644 124542 : owner: &Arc<LayerInner>,
1645 124542 : ctx: &RequestContext,
1646 124542 : ) -> anyhow::Result<ValueReconstructResult> {
1647 124542 : use LayerKind::*;
1648 124542 :
1649 124542 : match self.get(owner, ctx).await? {
1650 124016 : Delta(d) => {
1651 124016 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1652 22865 : .await
1653 : }
1654 526 : Image(i) => {
1655 526 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1656 448 : .await
1657 : }
1658 : }
1659 124542 : }
1660 :
1661 18 : async fn get_values_reconstruct_data(
1662 18 : &self,
1663 18 : keyspace: KeySpace,
1664 18 : lsn_range: Range<Lsn>,
1665 18 : reconstruct_data: &mut ValuesReconstructState,
1666 18 : owner: &Arc<LayerInner>,
1667 18 : ctx: &RequestContext,
1668 18 : ) -> Result<(), GetVectoredError> {
1669 18 : use LayerKind::*;
1670 18 :
1671 18 : match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
1672 18 : Delta(d) => {
1673 18 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1674 24 : .await
1675 : }
1676 0 : Image(i) => {
1677 0 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1678 0 : .await
1679 : }
1680 : }
1681 18 : }
1682 :
1683 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1684 4 : use LayerKind::*;
1685 4 : match self.get(owner, ctx).await? {
1686 4 : Delta(d) => d.dump(ctx).await?,
1687 0 : Image(i) => i.dump(ctx).await?,
1688 : }
1689 :
1690 4 : Ok(())
1691 4 : }
1692 : }
1693 :
1694 : /// Wrapper around an actual layer implementation.
1695 : #[derive(Debug)]
1696 : enum LayerKind {
1697 : Delta(delta_layer::DeltaLayerInner),
1698 : Image(image_layer::ImageLayerInner),
1699 : }
1700 :
1701 : /// Guard for forcing a layer be resident while it exists.
1702 : #[derive(Clone)]
1703 : pub(crate) struct ResidentLayer {
1704 : owner: Layer,
1705 : downloaded: Arc<DownloadedLayer>,
1706 : }
1707 :
1708 : impl std::fmt::Display for ResidentLayer {
1709 1390 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1710 1390 : write!(f, "{}", self.owner)
1711 1390 : }
1712 : }
1713 :
1714 : impl std::fmt::Debug for ResidentLayer {
1715 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1716 0 : write!(f, "{}", self.owner)
1717 0 : }
1718 : }
1719 :
1720 : impl ResidentLayer {
1721 : /// Release the eviction guard, converting back into a plain [`Layer`].
1722 : ///
1723 : /// You can access the [`Layer`] also by using `as_ref`.
1724 460 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1725 460 : self.into()
1726 460 : }
1727 :
1728 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1729 1768 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1730 : pub(crate) async fn load_keys<'a>(
1731 : &'a self,
1732 : ctx: &RequestContext,
1733 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1734 : use LayerKind::*;
1735 :
1736 : let owner = &self.owner.0;
1737 :
1738 : match self.downloaded.get(owner, ctx).await? {
1739 : Delta(ref d) => {
1740 : owner
1741 : .access_stats
1742 : .record_access(LayerAccessKind::KeyIter, ctx);
1743 :
1744 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1745 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1746 : // while it's being held.
1747 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1748 : .await
1749 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1750 : }
1751 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1752 : }
1753 : }
1754 :
1755 843 : pub(crate) fn local_path(&self) -> &Utf8Path {
1756 843 : &self.owner.0.path
1757 843 : }
1758 :
1759 948 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1760 948 : self.owner.metadata()
1761 948 : }
1762 :
1763 : #[cfg(test)]
1764 2 : pub(crate) async fn get_inner_delta<'a>(
1765 2 : &'a self,
1766 2 : ctx: &RequestContext,
1767 2 : ) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
1768 2 : let owner = &self.owner.0;
1769 2 : match self.downloaded.get(owner, ctx).await? {
1770 2 : LayerKind::Delta(d) => Ok(d),
1771 0 : LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
1772 : }
1773 2 : }
1774 : }
1775 :
1776 : impl AsLayerDesc for ResidentLayer {
1777 2712 : fn layer_desc(&self) -> &PersistentLayerDesc {
1778 2712 : self.owner.layer_desc()
1779 2712 : }
1780 : }
1781 :
1782 : impl AsRef<Layer> for ResidentLayer {
1783 1182 : fn as_ref(&self) -> &Layer {
1784 1182 : &self.owner
1785 1182 : }
1786 : }
1787 :
1788 : /// Drop the eviction guard.
1789 : impl From<ResidentLayer> for Layer {
1790 460 : fn from(value: ResidentLayer) -> Self {
1791 460 : value.owner
1792 460 : }
1793 : }
1794 :
1795 : use metrics::IntCounter;
1796 :
1797 : pub(crate) struct LayerImplMetrics {
1798 : started_evictions: IntCounter,
1799 : completed_evictions: IntCounter,
1800 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1801 :
1802 : started_deletes: IntCounter,
1803 : completed_deletes: IntCounter,
1804 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1805 :
1806 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1807 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1808 : redownload_after: metrics::Histogram,
1809 : time_to_evict: metrics::Histogram,
1810 : }
1811 :
1812 : impl Default for LayerImplMetrics {
1813 36 : fn default() -> Self {
1814 36 : use enum_map::Enum;
1815 36 :
1816 36 : // reminder: these will be pageserver_layer_* with "_total" suffix
1817 36 :
1818 36 : let started_evictions = metrics::register_int_counter!(
1819 36 : "pageserver_layer_started_evictions",
1820 36 : "Evictions started in the Layer implementation"
1821 36 : )
1822 36 : .unwrap();
1823 36 : let completed_evictions = metrics::register_int_counter!(
1824 36 : "pageserver_layer_completed_evictions",
1825 36 : "Evictions completed in the Layer implementation"
1826 36 : )
1827 36 : .unwrap();
1828 36 :
1829 36 : let cancelled_evictions = metrics::register_int_counter_vec!(
1830 36 : "pageserver_layer_cancelled_evictions_count",
1831 36 : "Different reasons for evictions to have been cancelled or failed",
1832 36 : &["reason"]
1833 36 : )
1834 36 : .unwrap();
1835 36 :
1836 324 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1837 324 : let reason = EvictionCancelled::from_usize(i);
1838 324 : let s = reason.as_str();
1839 324 : cancelled_evictions.with_label_values(&[s])
1840 324 : }));
1841 36 :
1842 36 : let started_deletes = metrics::register_int_counter!(
1843 36 : "pageserver_layer_started_deletes",
1844 36 : "Deletions on drop pending in the Layer implementation"
1845 36 : )
1846 36 : .unwrap();
1847 36 : let completed_deletes = metrics::register_int_counter!(
1848 36 : "pageserver_layer_completed_deletes",
1849 36 : "Deletions on drop completed in the Layer implementation"
1850 36 : )
1851 36 : .unwrap();
1852 36 :
1853 36 : let failed_deletes = metrics::register_int_counter_vec!(
1854 36 : "pageserver_layer_failed_deletes_count",
1855 36 : "Different reasons for deletions on drop to have failed",
1856 36 : &["reason"]
1857 36 : )
1858 36 : .unwrap();
1859 36 :
1860 72 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1861 72 : let reason = DeleteFailed::from_usize(i);
1862 72 : let s = reason.as_str();
1863 72 : failed_deletes.with_label_values(&[s])
1864 72 : }));
1865 36 :
1866 36 : let rare_counters = metrics::register_int_counter_vec!(
1867 36 : "pageserver_layer_assumed_rare_count",
1868 36 : "Times unexpected or assumed rare event happened",
1869 36 : &["event"]
1870 36 : )
1871 36 : .unwrap();
1872 36 :
1873 252 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1874 252 : let event = RareEvent::from_usize(i);
1875 252 : let s = event.as_str();
1876 252 : rare_counters.with_label_values(&[s])
1877 252 : }));
1878 36 :
1879 36 : let inits_cancelled = metrics::register_int_counter!(
1880 36 : "pageserver_layer_inits_cancelled_count",
1881 36 : "Times Layer initialization was cancelled",
1882 36 : )
1883 36 : .unwrap();
1884 36 :
1885 36 : let redownload_after = {
1886 36 : let minute = 60.0;
1887 36 : let hour = 60.0 * minute;
1888 36 : metrics::register_histogram!(
1889 36 : "pageserver_layer_redownloaded_after",
1890 36 : "Time between evicting and re-downloading.",
1891 36 : vec![
1892 36 : 10.0,
1893 36 : 30.0,
1894 36 : minute,
1895 36 : 5.0 * minute,
1896 36 : 15.0 * minute,
1897 36 : 30.0 * minute,
1898 36 : hour,
1899 36 : 12.0 * hour,
1900 36 : ]
1901 36 : )
1902 36 : .unwrap()
1903 36 : };
1904 36 :
1905 36 : let time_to_evict = metrics::register_histogram!(
1906 36 : "pageserver_layer_eviction_held_permit_seconds",
1907 36 : "Time eviction held the permit.",
1908 36 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
1909 36 : )
1910 36 : .unwrap();
1911 36 :
1912 36 : Self {
1913 36 : started_evictions,
1914 36 : completed_evictions,
1915 36 : cancelled_evictions,
1916 36 :
1917 36 : started_deletes,
1918 36 : completed_deletes,
1919 36 : failed_deletes,
1920 36 :
1921 36 : rare_counters,
1922 36 : inits_cancelled,
1923 36 : redownload_after,
1924 36 : time_to_evict,
1925 36 : }
1926 36 : }
1927 : }
1928 :
1929 : impl LayerImplMetrics {
1930 24 : fn inc_started_evictions(&self) {
1931 24 : self.started_evictions.inc();
1932 24 : }
1933 18 : fn inc_completed_evictions(&self) {
1934 18 : self.completed_evictions.inc();
1935 18 : }
1936 476 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
1937 476 : self.cancelled_evictions[reason].inc()
1938 476 : }
1939 :
1940 446 : fn inc_started_deletes(&self) {
1941 446 : self.started_deletes.inc();
1942 446 : }
1943 446 : fn inc_completed_deletes(&self) {
1944 446 : self.completed_deletes.inc();
1945 446 : }
1946 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
1947 0 : self.failed_deletes[reason].inc();
1948 0 : }
1949 :
1950 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
1951 : /// attempt regardless of failure to delete local file.
1952 0 : fn inc_delete_removes_failed(&self) {
1953 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
1954 0 : }
1955 :
1956 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
1957 : /// the single caller which can start the download, so use this counter to separte them.
1958 0 : fn inc_init_completed_without_requester(&self) {
1959 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
1960 0 : }
1961 :
1962 : /// Expected rare because cancellations are unexpected, and failures are unexpected
1963 0 : fn inc_download_failed_without_requester(&self) {
1964 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
1965 0 : }
1966 :
1967 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
1968 : ///
1969 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
1970 : /// Option.
1971 0 : fn inc_raced_wanted_evicted_accesses(&self) {
1972 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
1973 0 : }
1974 :
1975 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
1976 : /// running with remote storage.
1977 6 : fn inc_init_needed_no_download(&self) {
1978 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
1979 6 : }
1980 :
1981 : /// Expected rare because all layer files should be readable and good
1982 0 : fn inc_permanent_loading_failures(&self) {
1983 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
1984 0 : }
1985 :
1986 0 : fn inc_init_cancelled(&self) {
1987 0 : self.inits_cancelled.inc()
1988 0 : }
1989 :
1990 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
1991 6 : self.redownload_after.observe(duration.as_secs_f64())
1992 6 : }
1993 :
1994 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
1995 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
1996 : /// from other evictions, so this could have noise as well.
1997 0 : fn inc_evicted_with_waiters(&self) {
1998 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
1999 0 : }
2000 :
2001 : /// Recorded at least initially as the permit is now acquired in async context before
2002 : /// spawn_blocking action.
2003 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2004 18 : self.time_to_evict.observe(duration.as_secs_f64())
2005 18 : }
2006 : }
2007 :
2008 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2009 : enum EvictionCancelled {
2010 : LayerGone,
2011 : TimelineGone,
2012 : VersionCheckFailed,
2013 : FileNotFound,
2014 : RemoveFailed,
2015 : AlreadyReinitialized,
2016 : /// Not evicted because of a pending reinitialization
2017 : LostToDownload,
2018 : /// After eviction, there was a new layer access which cancelled the eviction.
2019 : UpgradedBackOnAccess,
2020 : UnexpectedEvictedState,
2021 : }
2022 :
2023 : impl EvictionCancelled {
2024 324 : fn as_str(&self) -> &'static str {
2025 324 : match self {
2026 36 : EvictionCancelled::LayerGone => "layer_gone",
2027 36 : EvictionCancelled::TimelineGone => "timeline_gone",
2028 36 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2029 36 : EvictionCancelled::FileNotFound => "file_not_found",
2030 36 : EvictionCancelled::RemoveFailed => "remove_failed",
2031 36 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2032 36 : EvictionCancelled::LostToDownload => "lost_to_download",
2033 36 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2034 36 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2035 : }
2036 324 : }
2037 : }
2038 :
2039 : #[derive(enum_map::Enum)]
2040 : enum DeleteFailed {
2041 : TimelineGone,
2042 : DeleteSchedulingFailed,
2043 : }
2044 :
2045 : impl DeleteFailed {
2046 72 : fn as_str(&self) -> &'static str {
2047 72 : match self {
2048 36 : DeleteFailed::TimelineGone => "timeline_gone",
2049 36 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2050 : }
2051 72 : }
2052 : }
2053 :
2054 : #[derive(enum_map::Enum)]
2055 : enum RareEvent {
2056 : RemoveOnDropFailed,
2057 : InitCompletedWithoutRequester,
2058 : DownloadFailedWithoutRequester,
2059 : UpgradedWantedEvicted,
2060 : InitWithoutDownload,
2061 : PermanentLoadingFailure,
2062 : EvictedWithWaiters,
2063 : }
2064 :
2065 : impl RareEvent {
2066 252 : fn as_str(&self) -> &'static str {
2067 252 : use RareEvent::*;
2068 252 :
2069 252 : match self {
2070 36 : RemoveOnDropFailed => "remove_on_drop_failed",
2071 36 : InitCompletedWithoutRequester => "init_completed_without",
2072 36 : DownloadFailedWithoutRequester => "download_failed_without",
2073 36 : UpgradedWantedEvicted => "raced_wanted_evicted",
2074 36 : InitWithoutDownload => "init_needed_no_download",
2075 36 : PermanentLoadingFailure => "permanent_loading_failure",
2076 36 : EvictedWithWaiters => "evicted_with_waiters",
2077 : }
2078 252 : }
2079 : }
2080 :
2081 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2082 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|