Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::{
5 : HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
6 : };
7 : use pageserver_api::shard::ShardIndex;
8 : use std::ops::Range;
9 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10 : use std::sync::{Arc, Weak};
11 : use std::time::{Duration, SystemTime};
12 : use tracing::Instrument;
13 : use utils::lsn::Lsn;
14 : use utils::sync::heavier_once_cell;
15 :
16 : use crate::config::PageServerConf;
17 : use crate::context::RequestContext;
18 : use crate::repository::Key;
19 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
20 : use crate::tenant::timeline::GetVectoredError;
21 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
22 :
23 : use super::delta_layer::{self, DeltaEntry};
24 : use super::image_layer;
25 : use super::{
26 : AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc,
27 : ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
28 : };
29 :
30 : use utils::generation::Generation;
31 :
32 : #[cfg(test)]
33 : mod tests;
34 :
35 : #[cfg(test)]
36 : mod failpoints;
37 :
38 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
39 : /// range of LSNs.
40 : ///
41 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
42 : /// layers are used to ingest incoming WAL, and provide fast access to the
43 : /// recent page versions. On-disk layers are stored as files on disk, and are
44 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
45 : /// [`InMemoryLayer`].
46 : ///
47 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
48 : /// A delta layer contains all modifications within a range of LSNs and keys.
49 : /// An image layer is a snapshot of all the data in a key-range, at a single
50 : /// LSN.
51 : ///
52 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
53 : /// general goal, read accesses should always win eviction and eviction should not wait for
54 : /// download.
55 : ///
56 : /// ### State transitions
57 : ///
58 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
59 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
60 : /// right size) or deleted.
61 : ///
62 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
63 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
64 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
65 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
66 : /// cancelling the eviction.
67 : ///
68 : /// ```text
69 : /// +-----------------+ get_or_maybe_download +--------------------------------+
70 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
71 : /// | ENOENT | /->| |
72 : /// +-----------------+ | +--------------------------------+
73 : /// ^ | | ^
74 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
75 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
76 : /// | | | | - re-initialize without download
77 : /// | | evict_and_wait | |
78 : /// +-----------------+ v |
79 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
80 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
81 : /// +-----------------+ +--------------------------------------+
82 : /// ```
83 : ///
84 : /// ### Unsupported
85 : ///
86 : /// - Evicting by the operator deleting files from the filesystem
87 : ///
88 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
89 : #[derive(Clone)]
90 : pub(crate) struct Layer(Arc<LayerInner>);
91 :
92 : impl std::fmt::Display for Layer {
93 1398 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 1398 : if matches!(self.0.generation, Generation::Broken) {
95 0 : write!(f, "{}-broken", self.layer_desc().short_id())
96 : } else {
97 1398 : write!(
98 1398 : f,
99 1398 : "{}{}",
100 1398 : self.layer_desc().short_id(),
101 1398 : self.0.generation.get_suffix()
102 1398 : )
103 : }
104 1398 : }
105 : }
106 :
107 : impl std::fmt::Debug for Layer {
108 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 0 : write!(f, "{}", self)
110 0 : }
111 : }
112 :
113 : impl AsLayerDesc for Layer {
114 381176 : fn layer_desc(&self) -> &PersistentLayerDesc {
115 381176 : self.0.layer_desc()
116 381176 : }
117 : }
118 :
119 : impl PartialEq for Layer {
120 4 : fn eq(&self, other: &Self) -> bool {
121 4 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
122 4 : }
123 : }
124 :
125 : impl Layer {
126 : /// Creates a layer value for a file we know to not be resident.
127 0 : pub(crate) fn for_evicted(
128 0 : conf: &'static PageServerConf,
129 0 : timeline: &Arc<Timeline>,
130 0 : file_name: LayerFileName,
131 0 : metadata: LayerFileMetadata,
132 0 : ) -> Self {
133 0 : let desc = PersistentLayerDesc::from_filename(
134 0 : timeline.tenant_shard_id,
135 0 : timeline.timeline_id,
136 0 : file_name,
137 0 : metadata.file_size(),
138 0 : );
139 0 :
140 0 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
141 0 :
142 0 : let owner = Layer(Arc::new(LayerInner::new(
143 0 : conf,
144 0 : timeline,
145 0 : access_stats,
146 0 : desc,
147 0 : None,
148 0 : metadata.generation,
149 0 : metadata.shard,
150 0 : )));
151 0 :
152 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
153 :
154 0 : owner
155 0 : }
156 :
157 : /// Creates a Layer value for a file we know to be resident in timeline directory.
158 24 : pub(crate) fn for_resident(
159 24 : conf: &'static PageServerConf,
160 24 : timeline: &Arc<Timeline>,
161 24 : file_name: LayerFileName,
162 24 : metadata: LayerFileMetadata,
163 24 : ) -> ResidentLayer {
164 24 : let desc = PersistentLayerDesc::from_filename(
165 24 : timeline.tenant_shard_id,
166 24 : timeline.timeline_id,
167 24 : file_name,
168 24 : metadata.file_size(),
169 24 : );
170 24 :
171 24 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
172 24 :
173 24 : let mut resident = None;
174 24 :
175 24 : let owner = Layer(Arc::new_cyclic(|owner| {
176 24 : let inner = Arc::new(DownloadedLayer {
177 24 : owner: owner.clone(),
178 24 : kind: tokio::sync::OnceCell::default(),
179 24 : version: 0,
180 24 : });
181 24 : resident = Some(inner.clone());
182 24 :
183 24 : LayerInner::new(
184 24 : conf,
185 24 : timeline,
186 24 : access_stats,
187 24 : desc,
188 24 : Some(inner),
189 24 : metadata.generation,
190 24 : metadata.shard,
191 24 : )
192 24 : }));
193 24 :
194 24 : let downloaded = resident.expect("just initialized");
195 24 :
196 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
197 :
198 24 : timeline
199 24 : .metrics
200 24 : .resident_physical_size_add(metadata.file_size());
201 24 :
202 24 : ResidentLayer { downloaded, owner }
203 24 : }
204 :
205 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
206 : /// temporary path.
207 960 : pub(crate) fn finish_creating(
208 960 : conf: &'static PageServerConf,
209 960 : timeline: &Arc<Timeline>,
210 960 : desc: PersistentLayerDesc,
211 960 : temp_path: &Utf8Path,
212 960 : ) -> anyhow::Result<ResidentLayer> {
213 960 : let mut resident = None;
214 960 :
215 960 : let owner = Layer(Arc::new_cyclic(|owner| {
216 960 : let inner = Arc::new(DownloadedLayer {
217 960 : owner: owner.clone(),
218 960 : kind: tokio::sync::OnceCell::default(),
219 960 : version: 0,
220 960 : });
221 960 : resident = Some(inner.clone());
222 960 : let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
223 960 : access_stats.record_residence_event(
224 960 : LayerResidenceStatus::Resident,
225 960 : LayerResidenceEventReason::LayerCreate,
226 960 : );
227 960 : LayerInner::new(
228 960 : conf,
229 960 : timeline,
230 960 : access_stats,
231 960 : desc,
232 960 : Some(inner),
233 960 : timeline.generation,
234 960 : timeline.get_shard_index(),
235 960 : )
236 960 : }));
237 960 :
238 960 : let downloaded = resident.expect("just initialized");
239 960 :
240 960 : // if the rename works, the path is as expected
241 960 : // TODO: sync system call
242 960 : std::fs::rename(temp_path, owner.local_path())
243 960 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
244 :
245 960 : Ok(ResidentLayer { downloaded, owner })
246 960 : }
247 :
248 : /// Requests the layer to be evicted and waits for this to be done.
249 : ///
250 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
251 : ///
252 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
253 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
254 : ///
255 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
256 : /// will happen regardless the future returned by this method completing unless there is a
257 : /// read access before eviction gets to complete.
258 : ///
259 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
260 : /// of download-evict cycle on retry.
261 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
262 48 : self.0.evict_and_wait(timeout).await
263 32 : }
264 :
265 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
266 : /// then.
267 : ///
268 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
269 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
270 : /// the value this is called on gets dropped.
271 : ///
272 : /// This is ensured by both of those methods accepting references to Layer.
273 : ///
274 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
275 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
276 454 : pub(crate) fn delete_on_drop(&self) {
277 454 : self.0.delete_on_drop();
278 454 : }
279 :
280 : /// Return data needed to reconstruct given page at LSN.
281 : ///
282 : /// It is up to the caller to collect more data from the previous layer and
283 : /// perform WAL redo, if necessary.
284 : ///
285 : /// # Cancellation-Safety
286 : ///
287 : /// This method is cancellation-safe.
288 123768 : pub(crate) async fn get_value_reconstruct_data(
289 123768 : &self,
290 123768 : key: Key,
291 123768 : lsn_range: Range<Lsn>,
292 123768 : reconstruct_data: &mut ValueReconstructState,
293 123768 : ctx: &RequestContext,
294 123768 : ) -> anyhow::Result<ValueReconstructResult> {
295 : use anyhow::ensure;
296 :
297 123768 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
298 123768 : self.0
299 123768 : .access_stats
300 123768 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
301 123768 :
302 123768 : if self.layer_desc().is_delta {
303 123242 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
304 123242 : ensure!(self.layer_desc().key_range.contains(&key));
305 : } else {
306 526 : ensure!(self.layer_desc().key_range.contains(&key));
307 526 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
308 526 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
309 : }
310 :
311 123768 : layer
312 123768 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
313 123768 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
314 23444 : .await
315 123768 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
316 123768 : }
317 :
318 18 : pub(crate) async fn get_values_reconstruct_data(
319 18 : &self,
320 18 : keyspace: KeySpace,
321 18 : lsn_range: Range<Lsn>,
322 18 : reconstruct_data: &mut ValuesReconstructState,
323 18 : ctx: &RequestContext,
324 18 : ) -> Result<(), GetVectoredError> {
325 18 : let layer = self
326 18 : .0
327 18 : .get_or_maybe_download(true, Some(ctx))
328 0 : .await
329 18 : .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
330 :
331 18 : self.0
332 18 : .access_stats
333 18 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
334 18 :
335 18 : layer
336 18 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
337 18 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
338 41 : .await
339 18 : }
340 :
341 : /// Download the layer if evicted.
342 : ///
343 : /// Will not error when the layer is already downloaded.
344 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
345 0 : self.0.get_or_maybe_download(true, None).await?;
346 0 : Ok(())
347 0 : }
348 :
349 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
350 : /// while the guard exists.
351 : ///
352 : /// Returns None if the layer is currently evicted or becoming evicted.
353 : #[cfg(test)]
354 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
355 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
356 :
357 14 : Some(ResidentLayer {
358 14 : downloaded,
359 14 : owner: self.clone(),
360 14 : })
361 20 : }
362 :
363 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
364 : /// with `EvictionError::NotFound`.
365 : ///
366 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
367 : /// will be unless a read happens soon.
368 44 : pub(crate) fn is_likely_resident(&self) -> bool {
369 44 : self.0
370 44 : .inner
371 44 : .get()
372 44 : .map(|rowe| rowe.is_likely_resident())
373 44 : .unwrap_or(false)
374 44 : }
375 :
376 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
377 454 : pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
378 454 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
379 :
380 454 : Ok(ResidentLayer {
381 454 : downloaded,
382 454 : owner: self.clone(),
383 454 : })
384 454 : }
385 :
386 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
387 0 : self.0.info(reset)
388 0 : }
389 :
390 0 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
391 0 : &self.0.access_stats
392 0 : }
393 :
394 1108 : pub(crate) fn local_path(&self) -> &Utf8Path {
395 1108 : &self.0.path
396 1108 : }
397 :
398 956 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
399 956 : self.0.metadata()
400 956 : }
401 :
402 : /// Traditional debug dumping facility
403 : #[allow(unused)]
404 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
405 4 : self.0.desc.dump();
406 4 :
407 4 : if verbose {
408 : // for now, unconditionally download everything, even if that might not be wanted.
409 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
410 8 : l.dump(&self.0, ctx).await?
411 0 : }
412 :
413 4 : Ok(())
414 4 : }
415 :
416 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
417 : /// deletion scheduling has completed).
418 : ///
419 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
420 : /// separatedly.
421 : #[cfg(any(feature = "testing", test))]
422 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
423 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
424 :
425 2 : async move {
426 : loop {
427 6 : if rx.changed().await.is_err() {
428 2 : break;
429 0 : }
430 : }
431 2 : }
432 2 : }
433 : }
434 :
435 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
436 : ///
437 : /// However when we want something evicted, we cannot evict it right away as there might be current
438 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
439 : /// read with [`Layer::get_value_reconstruct_data`].
440 : ///
441 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
442 : #[derive(Debug)]
443 : enum ResidentOrWantedEvicted {
444 : Resident(Arc<DownloadedLayer>),
445 : WantedEvicted(Weak<DownloadedLayer>, usize),
446 : }
447 :
448 : impl ResidentOrWantedEvicted {
449 : /// Non-mutating access to the a DownloadedLayer, if possible.
450 : ///
451 : /// This is not used on the read path (anything that calls
452 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
453 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
454 : #[cfg(test)]
455 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
456 14 : match self {
457 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
458 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
459 : }
460 14 : }
461 :
462 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
463 : /// reference from `ResidentOrWantedEvicted::get`.
464 38 : fn is_likely_resident(&self) -> bool {
465 38 : match self {
466 32 : ResidentOrWantedEvicted::Resident(_) => true,
467 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
468 : }
469 38 : }
470 :
471 : /// Upgrades any weak to strong if possible.
472 : ///
473 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
474 : /// happened.
475 124248 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
476 124248 : match self {
477 124240 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
478 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
479 0 : Some(strong) => {
480 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
481 0 :
482 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
483 0 :
484 0 : Some((strong, true))
485 : }
486 8 : None => None,
487 : },
488 : }
489 124248 : }
490 :
491 : /// When eviction is first requested, drop down to holding a [`Weak`].
492 : ///
493 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
494 : /// drop the possibly last strong reference outside of the mutex of
495 : /// [`heavier_once_cell::OnceCell`].
496 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
497 30 : match self {
498 26 : ResidentOrWantedEvicted::Resident(strong) => {
499 26 : let weak = Arc::downgrade(strong);
500 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
501 26 : std::mem::swap(self, &mut temp);
502 26 : match temp {
503 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
504 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
505 : }
506 : }
507 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
508 : }
509 30 : }
510 : }
511 :
512 : struct LayerInner {
513 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
514 : /// [`Self::path`].
515 : conf: &'static PageServerConf,
516 :
517 : /// Full path to the file; unclear if this should exist anymore.
518 : path: Utf8PathBuf,
519 :
520 : desc: PersistentLayerDesc,
521 :
522 : /// Timeline access is needed for remote timeline client and metrics.
523 : ///
524 : /// There should not be an access to timeline for any reason without entering the
525 : /// [`Timeline::gate`] at the same time.
526 : timeline: Weak<Timeline>,
527 :
528 : /// Cached knowledge of [`Timeline::remote_client`] being `Some`.
529 : have_remote_client: bool,
530 :
531 : access_stats: LayerAccessStats,
532 :
533 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
534 : ///
535 : /// Filesystem changes (download, evict) are only done while holding a permit which the
536 : /// `heavier_once_cell` provides.
537 : ///
538 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
539 : /// possibly read while not holding it.
540 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
541 :
542 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
543 : wanted_deleted: AtomicBool,
544 :
545 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
546 : ///
547 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
548 : /// `ResidentOrWantedEvicted::WantedEvicted`.
549 : version: AtomicUsize,
550 :
551 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
552 : /// starts, or completes.
553 : ///
554 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
555 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
556 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
557 : /// [`ResidentOrWantedEvicted::Resident`] on access.
558 : ///
559 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
560 : status: Option<tokio::sync::watch::Sender<Status>>,
561 :
562 : /// Counter for exponential backoff with the download.
563 : ///
564 : /// This is atomic only for the purposes of having additional data only accessed while holding
565 : /// the InitPermit.
566 : consecutive_failures: AtomicUsize,
567 :
568 : /// The generation of this Layer.
569 : ///
570 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
571 : /// for created layers from [`Timeline::generation`].
572 : generation: Generation,
573 :
574 : /// The shard of this Layer.
575 : ///
576 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
577 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
578 : ///
579 : /// For loaded layers, this may be some other value if the tenant has undergone
580 : /// a shard split since the layer was originally written.
581 : shard: ShardIndex,
582 :
583 : /// When the Layer was last evicted but has not been downloaded since.
584 : ///
585 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
586 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
587 :
588 : #[cfg(test)]
589 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
590 : }
591 :
592 : impl std::fmt::Display for LayerInner {
593 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594 30 : write!(f, "{}", self.layer_desc().short_id())
595 30 : }
596 : }
597 :
598 : impl AsLayerDesc for LayerInner {
599 383456 : fn layer_desc(&self) -> &PersistentLayerDesc {
600 383456 : &self.desc
601 383456 : }
602 : }
603 :
604 : #[derive(Debug, Clone, Copy)]
605 : enum Status {
606 : Resident,
607 : Evicted,
608 : Downloading,
609 : }
610 :
611 : impl Drop for LayerInner {
612 486 : fn drop(&mut self) {
613 : // if there was a pending eviction, mark it cancelled here to balance metrics
614 486 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
615 2 : {
616 2 : // eviction has already been started
617 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
618 2 :
619 2 : // eviction request is intentionally not honored as no one is present to wait for it
620 2 : // and we could be delaying shutdown for nothing.
621 484 : }
622 :
623 486 : if !*self.wanted_deleted.get_mut() {
624 36 : return;
625 450 : }
626 :
627 450 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
628 :
629 450 : let path = std::mem::take(&mut self.path);
630 450 : let file_name = self.layer_desc().filename();
631 450 : let file_size = self.layer_desc().file_size;
632 450 : let timeline = self.timeline.clone();
633 450 : let meta = self.metadata();
634 450 : let status = self.status.take();
635 450 :
636 450 : Self::spawn_blocking(move || {
637 450 : let _g = span.entered();
638 450 :
639 450 : // carry this until we are finished for [`Layer::wait_drop`] support
640 450 : let _status = status;
641 :
642 450 : let Some(timeline) = timeline.upgrade() else {
643 : // no need to nag that timeline is gone: under normal situation on
644 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
645 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
646 0 : return;
647 : };
648 :
649 450 : let Ok(_guard) = timeline.gate.enter() else {
650 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
651 0 : return;
652 : };
653 :
654 450 : let removed = match std::fs::remove_file(path) {
655 448 : Ok(()) => true,
656 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
657 2 : // until we no longer do detaches by removing all local files before removing the
658 2 : // tenant from the global map, we will always get these errors even if we knew what
659 2 : // is the latest state.
660 2 : //
661 2 : // we currently do not track the latest state, so we'll also end up here on evicted
662 2 : // layers.
663 2 : false
664 : }
665 0 : Err(e) => {
666 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
667 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
668 0 : false
669 : }
670 : };
671 :
672 450 : if removed {
673 448 : timeline.metrics.resident_physical_size_sub(file_size);
674 448 : }
675 450 : if let Some(remote_client) = timeline.remote_client.as_ref() {
676 450 : let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
677 :
678 450 : if let Err(e) = res {
679 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
680 : // demonstrating this deadlock (without spawn_blocking): stop will drop
681 : // queued items, which will have ResidentLayer's, and those drops would try
682 : // to re-entrantly lock the RemoteTimelineClient inner state.
683 0 : if !timeline.is_active() {
684 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
685 : } else {
686 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
687 : }
688 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
689 450 : } else {
690 450 : LAYER_IMPL_METRICS.inc_completed_deletes();
691 450 : }
692 0 : }
693 450 : });
694 486 : }
695 : }
696 :
697 : impl LayerInner {
698 984 : fn new(
699 984 : conf: &'static PageServerConf,
700 984 : timeline: &Arc<Timeline>,
701 984 : access_stats: LayerAccessStats,
702 984 : desc: PersistentLayerDesc,
703 984 : downloaded: Option<Arc<DownloadedLayer>>,
704 984 : generation: Generation,
705 984 : shard: ShardIndex,
706 984 : ) -> Self {
707 984 : let path = conf
708 984 : .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
709 984 : .join(desc.filename().to_string());
710 :
711 984 : let (inner, version, init_status) = if let Some(inner) = downloaded {
712 984 : let version = inner.version;
713 984 : let resident = ResidentOrWantedEvicted::Resident(inner);
714 984 : (
715 984 : heavier_once_cell::OnceCell::new(resident),
716 984 : version,
717 984 : Status::Resident,
718 984 : )
719 : } else {
720 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
721 : };
722 :
723 984 : LayerInner {
724 984 : conf,
725 984 : path,
726 984 : desc,
727 984 : timeline: Arc::downgrade(timeline),
728 984 : have_remote_client: timeline.remote_client.is_some(),
729 984 : access_stats,
730 984 : wanted_deleted: AtomicBool::new(false),
731 984 : inner,
732 984 : version: AtomicUsize::new(version),
733 984 : status: Some(tokio::sync::watch::channel(init_status).0),
734 984 : consecutive_failures: AtomicUsize::new(0),
735 984 : generation,
736 984 : shard,
737 984 : last_evicted_at: std::sync::Mutex::default(),
738 984 : #[cfg(test)]
739 984 : failpoints: Default::default(),
740 984 : }
741 984 : }
742 :
743 454 : fn delete_on_drop(&self) {
744 454 : let res =
745 454 : self.wanted_deleted
746 454 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
747 454 :
748 454 : if res.is_ok() {
749 450 : LAYER_IMPL_METRICS.inc_started_deletes();
750 450 : }
751 454 : }
752 :
753 : /// Cancellation safe, however dropping the future and calling this method again might result
754 : /// in a new attempt to evict OR join the previously started attempt.
755 144 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
756 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
757 : assert!(self.have_remote_client);
758 :
759 : let mut rx = self.status.as_ref().unwrap().subscribe();
760 :
761 : {
762 : let current = rx.borrow_and_update();
763 : match &*current {
764 : Status::Resident => {
765 : // we might get lucky and evict this; continue
766 : }
767 : Status::Evicted | Status::Downloading => {
768 : // it is already evicted
769 : return Err(EvictionError::NotFound);
770 : }
771 : }
772 : }
773 :
774 : let strong = {
775 : match self.inner.get() {
776 : Some(mut either) => either.downgrade(),
777 : None => {
778 : // we already have a scheduled eviction, which just has not gotten to run yet.
779 : // it might still race with a read access, but that could also get cancelled,
780 : // so let's say this is not evictable.
781 : return Err(EvictionError::NotFound);
782 : }
783 : }
784 : };
785 :
786 : if strong.is_some() {
787 : // drop the DownloadedLayer outside of the holding the guard
788 : drop(strong);
789 :
790 : // idea here is that only one evicter should ever get to witness a strong reference,
791 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
792 : // cancelled eviction and signal us, like it currently does.
793 : //
794 : // a second concurrent evict_and_wait will not see a strong reference.
795 : LAYER_IMPL_METRICS.inc_started_evictions();
796 : }
797 :
798 : let changed = rx.changed();
799 : let changed = tokio::time::timeout(timeout, changed).await;
800 :
801 : let Ok(changed) = changed else {
802 : return Err(EvictionError::Timeout);
803 : };
804 :
805 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
806 :
807 : let current = rx.borrow_and_update();
808 :
809 : match &*current {
810 : // the easiest case
811 : Status::Evicted => Ok(()),
812 : // it surely was evicted in between, but then there was a new access now; we can't know
813 : // if it'll succeed so lets just call it evicted
814 : Status::Downloading => Ok(()),
815 : // either the download which was started after eviction completed already, or it was
816 : // never evicted
817 : Status::Resident => Err(EvictionError::Downloaded),
818 : }
819 : }
820 :
821 : /// Cancellation safe.
822 124256 : async fn get_or_maybe_download(
823 124256 : self: &Arc<Self>,
824 124256 : allow_download: bool,
825 124256 : ctx: Option<&RequestContext>,
826 124256 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
827 16 : let (weak, permit) = {
828 : // get_or_init_detached can:
829 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
830 : // - be slow (wait for semaphore permit or closing)
831 124256 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
832 :
833 124256 : let locked = self
834 124256 : .inner
835 124256 : .get_or_init_detached()
836 2 : .await
837 124256 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
838 124256 :
839 124256 : scopeguard::ScopeGuard::into_inner(init_cancelled);
840 :
841 124240 : match locked {
842 : // this path could had been a RwLock::read
843 124240 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
844 0 : Ok(Ok((strong, _))) => {
845 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
846 0 : // previously a `evict_and_wait` was received. this is the only place when we
847 0 : // send out an update without holding the InitPermit.
848 0 : //
849 0 : // note that we also have dropped the Guard; this is fine, because we just made
850 0 : // a state change and are holding a strong reference to be returned.
851 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
852 0 : LAYER_IMPL_METRICS
853 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
854 0 :
855 0 : return Ok(strong);
856 : }
857 8 : Ok(Err(guard)) => {
858 8 : // path to here: we won the eviction, the file should still be on the disk.
859 8 : let (weak, permit) = guard.take_and_deinit();
860 8 : (Some(weak), permit)
861 : }
862 8 : Err(permit) => (None, permit),
863 : }
864 : };
865 :
866 16 : if let Some(weak) = weak {
867 : // only drop the weak after dropping the heavier_once_cell guard
868 8 : assert!(
869 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
870 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
871 : );
872 8 : }
873 :
874 16 : let timeline = self
875 16 : .timeline
876 16 : .upgrade()
877 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
878 :
879 : // count cancellations, which currently remain largely unexpected
880 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
881 :
882 : // check if we really need to be downloaded: this can happen if a read access won the
883 : // semaphore before eviction.
884 : //
885 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
886 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
887 16 : let needs_download = self
888 16 : .needs_download()
889 16 : .await
890 16 : .map_err(DownloadError::PreStatFailed);
891 16 :
892 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
893 :
894 16 : let needs_download = needs_download?;
895 :
896 16 : let Some(reason) = needs_download else {
897 : // the file is present locally because eviction has not had a chance to run yet
898 :
899 : #[cfg(test)]
900 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
901 2 : .await?;
902 :
903 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
904 6 :
905 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
906 : };
907 :
908 : // we must download; getting cancelled before spawning the download is not an issue as
909 : // any still running eviction would not find anything to evict.
910 :
911 8 : if let NeedsDownload::NotFile(ft) = reason {
912 0 : return Err(DownloadError::NotFile(ft));
913 8 : }
914 8 :
915 8 : if timeline.remote_client.as_ref().is_none() {
916 0 : return Err(DownloadError::NoRemoteStorage);
917 8 : }
918 :
919 8 : if let Some(ctx) = ctx {
920 2 : self.check_expected_download(ctx)?;
921 6 : }
922 :
923 8 : if !allow_download {
924 : // this is only used from tests, but it is hard to test without the boolean
925 2 : return Err(DownloadError::DownloadRequired);
926 6 : }
927 :
928 6 : async move {
929 6 : tracing::info!(%reason, "downloading on-demand");
930 :
931 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
932 10 : let res = self.download_init_and_wait(timeline, permit).await?;
933 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
934 6 : Ok(res)
935 6 : }
936 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
937 10 : .await
938 124256 : }
939 :
940 : /// Nag or fail per RequestContext policy
941 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
942 2 : use crate::context::DownloadBehavior::*;
943 2 : let b = ctx.download_behavior();
944 2 : match b {
945 2 : Download => Ok(()),
946 : Warn | Error => {
947 0 : tracing::info!(
948 0 : "unexpectedly on-demand downloading for task kind {:?}",
949 0 : ctx.task_kind()
950 0 : );
951 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
952 :
953 0 : let really_error =
954 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
955 :
956 0 : if really_error {
957 : // this check is only probablistic, seems like flakyness footgun
958 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
959 : } else {
960 0 : Ok(())
961 : }
962 : }
963 : }
964 2 : }
965 :
966 : /// Actual download, at most one is executed at the time.
967 6 : async fn download_init_and_wait(
968 6 : self: &Arc<Self>,
969 6 : timeline: Arc<Timeline>,
970 6 : permit: heavier_once_cell::InitPermit,
971 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
972 6 : debug_assert_current_span_has_tenant_and_timeline_id();
973 6 :
974 6 : let (tx, rx) = tokio::sync::oneshot::channel();
975 6 :
976 6 : let this: Arc<Self> = self.clone();
977 :
978 6 : let guard = timeline
979 6 : .gate
980 6 : .enter()
981 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
982 :
983 6 : Self::spawn(
984 6 : async move {
985 0 : let _guard = guard;
986 0 :
987 0 : // now that we have commited to downloading, send out an update to:
988 0 : // - unhang any pending eviction
989 0 : // - break out of evict_and_wait
990 0 : this.status
991 0 : .as_ref()
992 0 : .unwrap()
993 0 : .send_replace(Status::Downloading);
994 6 :
995 6 : #[cfg(test)]
996 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
997 2 : .await
998 6 : .unwrap();
999 :
1000 93 : let res = this.download_and_init(timeline, permit).await;
1001 :
1002 6 : if let Err(res) = tx.send(res) {
1003 0 : match res {
1004 0 : Ok(_res) => {
1005 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1006 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1007 : }
1008 0 : Err(e) => {
1009 0 : tracing::info!(
1010 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1011 0 : );
1012 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1013 : }
1014 : }
1015 6 : }
1016 6 : }
1017 6 : .in_current_span(),
1018 6 : );
1019 6 :
1020 10 : match rx.await {
1021 6 : Ok(Ok(res)) => Ok(res),
1022 0 : Ok(Err(e)) => {
1023 0 : // sleep already happened in the spawned task, if it was not cancelled
1024 0 : match e.downcast_ref::<remote_storage::DownloadError>() {
1025 : // If the download failed due to its cancellation token,
1026 : // propagate the cancellation error upstream.
1027 : Some(remote_storage::DownloadError::Cancelled) => {
1028 0 : Err(DownloadError::DownloadCancelled)
1029 : }
1030 : // FIXME: this is not embedding the error because historically it would had
1031 : // been output to compute, however that is no longer the case.
1032 0 : _ => Err(DownloadError::DownloadFailed),
1033 : }
1034 : }
1035 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1036 : }
1037 6 : }
1038 :
1039 6 : async fn download_and_init(
1040 6 : self: &Arc<LayerInner>,
1041 6 : timeline: Arc<Timeline>,
1042 6 : permit: heavier_once_cell::InitPermit,
1043 6 : ) -> anyhow::Result<Arc<DownloadedLayer>> {
1044 6 : let client = timeline
1045 6 : .remote_client
1046 6 : .as_ref()
1047 6 : .expect("checked before download_init_and_wait");
1048 :
1049 6 : let result = client
1050 6 : .download_layer_file(&self.desc.filename(), &self.metadata(), &timeline.cancel)
1051 87 : .await;
1052 :
1053 6 : match result {
1054 6 : Ok(size) => {
1055 6 : assert_eq!(size, self.desc.file_size);
1056 :
1057 6 : match self.needs_download().await {
1058 0 : Ok(Some(reason)) => {
1059 0 : // this is really a bug in needs_download or remote timeline client
1060 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1061 : }
1062 6 : Ok(None) => {
1063 6 : // as expected
1064 6 : }
1065 0 : Err(e) => {
1066 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1067 : }
1068 : }
1069 :
1070 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1071 6 : timeline
1072 6 : .metrics
1073 6 : .resident_physical_size_add(self.desc.file_size);
1074 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1075 6 :
1076 6 : let since_last_eviction = self
1077 6 : .last_evicted_at
1078 6 : .lock()
1079 6 : .unwrap()
1080 6 : .take()
1081 6 : .map(|ts| ts.elapsed());
1082 6 : if let Some(since_last_eviction) = since_last_eviction {
1083 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1084 6 : }
1085 :
1086 6 : self.access_stats.record_residence_event(
1087 6 : LayerResidenceStatus::Resident,
1088 6 : LayerResidenceEventReason::ResidenceChange,
1089 6 : );
1090 6 :
1091 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1092 : }
1093 0 : Err(e) => {
1094 0 : let consecutive_failures =
1095 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1096 0 :
1097 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1098 :
1099 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1100 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1101 0 : 1.5,
1102 0 : 60.0,
1103 0 : );
1104 0 :
1105 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1106 :
1107 0 : tokio::select! {
1108 0 : _ = tokio::time::sleep(backoff) => {},
1109 0 : _ = timeline.cancel.cancelled() => {},
1110 0 : };
1111 :
1112 0 : Err(e)
1113 : }
1114 : }
1115 6 : }
1116 :
1117 : /// Initializes the `Self::inner` to a "resident" state.
1118 : ///
1119 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1120 : /// before calling this method.
1121 : ///
1122 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1123 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1124 12 : fn initialize_after_layer_is_on_disk(
1125 12 : self: &Arc<LayerInner>,
1126 12 : permit: heavier_once_cell::InitPermit,
1127 12 : ) -> Arc<DownloadedLayer> {
1128 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1129 12 :
1130 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1131 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1132 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1133 12 :
1134 12 : let res = Arc::new(DownloadedLayer {
1135 12 : owner: Arc::downgrade(self),
1136 12 : kind: tokio::sync::OnceCell::default(),
1137 12 : version: next_version,
1138 12 : });
1139 12 :
1140 12 : let waiters = self.inner.initializer_count();
1141 12 : if waiters > 0 {
1142 0 : tracing::info!(waiters, "completing layer init for other tasks");
1143 12 : }
1144 :
1145 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1146 12 :
1147 12 : self.inner.set(value, permit);
1148 12 :
1149 12 : res
1150 12 : }
1151 :
1152 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1153 24 : match tokio::fs::metadata(&self.path).await {
1154 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1155 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1156 0 : Err(e) => Err(e),
1157 : }
1158 24 : }
1159 :
1160 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1161 24 : match self.path.metadata() {
1162 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1163 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1164 0 : Err(e) => Err(e),
1165 : }
1166 24 : }
1167 :
1168 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1169 40 : // in future, this should include sha2-256 validation of the file.
1170 40 : if !m.is_file() {
1171 0 : Err(NeedsDownload::NotFile(m.file_type()))
1172 40 : } else if m.len() != self.desc.file_size {
1173 0 : Err(NeedsDownload::WrongSize {
1174 0 : actual: m.len(),
1175 0 : expected: self.desc.file_size,
1176 0 : })
1177 : } else {
1178 40 : Ok(())
1179 : }
1180 40 : }
1181 :
1182 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1183 0 : let layer_file_name = self.desc.filename().file_name();
1184 0 :
1185 0 : let resident = self
1186 0 : .inner
1187 0 : .get()
1188 0 : .map(|rowe| rowe.is_likely_resident())
1189 0 : .unwrap_or(false);
1190 0 :
1191 0 : let access_stats = self.access_stats.as_api_model(reset);
1192 0 :
1193 0 : if self.desc.is_delta {
1194 0 : let lsn_range = &self.desc.lsn_range;
1195 0 :
1196 0 : HistoricLayerInfo::Delta {
1197 0 : layer_file_name,
1198 0 : layer_file_size: self.desc.file_size,
1199 0 : lsn_start: lsn_range.start,
1200 0 : lsn_end: lsn_range.end,
1201 0 : remote: !resident,
1202 0 : access_stats,
1203 0 : }
1204 : } else {
1205 0 : let lsn = self.desc.image_layer_lsn();
1206 0 :
1207 0 : HistoricLayerInfo::Image {
1208 0 : layer_file_name,
1209 0 : layer_file_size: self.desc.file_size,
1210 0 : lsn_start: lsn,
1211 0 : remote: !resident,
1212 0 : access_stats,
1213 0 : }
1214 : }
1215 0 : }
1216 :
1217 : /// `DownloadedLayer` is being dropped, so it calls this method.
1218 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1219 24 : let can_evict = self.have_remote_client;
1220 :
1221 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1222 : // though here it is very likely
1223 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1224 :
1225 24 : if !can_evict {
1226 : // it would be nice to assert this case out, but we are in drop
1227 0 : span.in_scope(|| {
1228 0 : tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
1229 0 : });
1230 0 : return;
1231 24 : }
1232 24 :
1233 24 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1234 24 : // drop while the `self.inner` is being locked, leading to a deadlock.
1235 24 :
1236 24 : let start_evicting = async move {
1237 24 : #[cfg(test)]
1238 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1239 14 : .await
1240 24 : .expect("failpoint should not have errored");
1241 24 :
1242 24 : tracing::debug!("eviction started");
1243 :
1244 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1245 : // metrics: ignore the Ok branch, it is not done yet
1246 24 : if let Err(e) = res {
1247 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1248 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1249 18 : }
1250 24 : };
1251 :
1252 24 : Self::spawn(start_evicting.instrument(span));
1253 24 : }
1254 :
1255 24 : async fn wait_for_turn_and_evict(
1256 24 : self: Arc<LayerInner>,
1257 24 : only_version: usize,
1258 24 : ) -> Result<(), EvictionCancelled> {
1259 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1260 46 : use Status::*;
1261 46 : match status {
1262 44 : Resident => Ok(()),
1263 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1264 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1265 : }
1266 46 : }
1267 :
1268 24 : let timeline = self
1269 24 : .timeline
1270 24 : .upgrade()
1271 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1272 :
1273 24 : let mut rx = self
1274 24 : .status
1275 24 : .as_ref()
1276 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1277 24 : .subscribe();
1278 24 :
1279 24 : is_good_to_continue(&rx.borrow_and_update())?;
1280 :
1281 22 : let Ok(_gate) = timeline.gate.enter() else {
1282 0 : return Err(EvictionCancelled::TimelineGone);
1283 : };
1284 :
1285 18 : let permit = {
1286 : // we cannot just `std::fs::remove_file` because there might already be an
1287 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1288 : // operations must be done while holding the heavier_once_cell::InitPermit
1289 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1290 :
1291 22 : let waited = loop {
1292 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1293 22 : // completion of the download. waiting for download could be long and hinder our
1294 22 : // efforts to alert on "hanging" evictions.
1295 22 : tokio::select! {
1296 22 : res = &mut wait => break res,
1297 : _ = rx.changed() => {
1298 : is_good_to_continue(&rx.borrow_and_update())?;
1299 : // two possibilities for Status::Resident:
1300 : // - the layer was found locally from disk by a read
1301 : // - we missed a bunch of updates and now the layer is
1302 : // again downloaded -- assume we'll fail later on with
1303 : // version check or AlreadyReinitialized
1304 : }
1305 22 : }
1306 22 : };
1307 :
1308 : // re-check now that we have the guard or permit; all updates should have happened
1309 : // while holding the permit.
1310 22 : is_good_to_continue(&rx.borrow_and_update())?;
1311 :
1312 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1313 : // lead to deallocating the reference counted value, and the value we
1314 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1315 22 : let (_weak, permit) = match waited {
1316 20 : Ok(guard) => {
1317 20 : match &*guard {
1318 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1319 18 : if *version == only_version =>
1320 16 : {
1321 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1322 16 : let (weak, permit) = guard.take_and_deinit();
1323 16 : (Some(weak), permit)
1324 : }
1325 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1326 2 : // if we were not doing the version check, we would need to try to
1327 2 : // upgrade the weak here to see if it really is dropped. version check
1328 2 : // is done instead assuming that it is cheaper.
1329 2 : tracing::debug!(
1330 0 : version,
1331 0 : only_version,
1332 0 : "version mismatch, not deinitializing"
1333 0 : );
1334 2 : return Err(EvictionCancelled::VersionCheckFailed);
1335 : }
1336 : ResidentOrWantedEvicted::Resident(_) => {
1337 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1338 : }
1339 : }
1340 : }
1341 2 : Err(permit) => {
1342 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1343 2 : (None, permit)
1344 : }
1345 : };
1346 :
1347 18 : permit
1348 18 : };
1349 18 :
1350 18 : let span = tracing::Span::current();
1351 18 :
1352 18 : let spawned_at = std::time::Instant::now();
1353 18 :
1354 18 : // this is on purpose a detached spawn; we don't need to wait for it
1355 18 : //
1356 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1357 18 : // well from a spawn_blocking thread.
1358 18 : //
1359 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1360 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1361 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1362 18 : // evicting.
1363 18 : //
1364 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1365 18 : // reads. We will need to add cancellation for that if necessary.
1366 18 : Self::spawn_blocking(move || {
1367 18 : let _span = span.entered();
1368 18 :
1369 18 : let res = self.evict_blocking(&timeline, &permit);
1370 18 :
1371 18 : let waiters = self.inner.initializer_count();
1372 18 :
1373 18 : if waiters > 0 {
1374 1 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1375 17 : }
1376 :
1377 18 : let completed_in = spawned_at.elapsed();
1378 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1379 18 :
1380 18 : match res {
1381 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1382 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1383 : }
1384 :
1385 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1386 18 : });
1387 18 :
1388 18 : Ok(())
1389 24 : }
1390 :
1391 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1392 18 : fn evict_blocking(
1393 18 : &self,
1394 18 : timeline: &Timeline,
1395 18 : _permit: &heavier_once_cell::InitPermit,
1396 18 : ) -> Result<(), EvictionCancelled> {
1397 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1398 18 :
1399 18 : match capture_mtime_and_remove(&self.path) {
1400 18 : Ok(local_layer_mtime) => {
1401 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1402 18 : match duration {
1403 18 : Ok(elapsed) => {
1404 18 : timeline
1405 18 : .metrics
1406 18 : .evictions_with_low_residence_duration
1407 18 : .read()
1408 18 : .unwrap()
1409 18 : .observe(elapsed);
1410 18 : tracing::info!(
1411 18 : residence_millis = elapsed.as_millis(),
1412 18 : "evicted layer after known residence period"
1413 18 : );
1414 : }
1415 : Err(_) => {
1416 0 : tracing::info!("evicted layer after unknown residence period");
1417 : }
1418 : }
1419 18 : timeline.metrics.evictions.inc();
1420 18 : timeline
1421 18 : .metrics
1422 18 : .resident_physical_size_sub(self.desc.file_size);
1423 : }
1424 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1425 0 : tracing::error!(
1426 0 : layer_size = %self.desc.file_size,
1427 0 : "failed to evict layer from disk, it was already gone"
1428 0 : );
1429 0 : return Err(EvictionCancelled::FileNotFound);
1430 : }
1431 0 : Err(e) => {
1432 0 : // FIXME: this should probably be an abort
1433 0 : tracing::error!("failed to evict file from disk: {e:#}");
1434 0 : return Err(EvictionCancelled::RemoveFailed);
1435 : }
1436 : }
1437 :
1438 18 : self.access_stats.record_residence_event(
1439 18 : LayerResidenceStatus::Evicted,
1440 18 : LayerResidenceEventReason::ResidenceChange,
1441 18 : );
1442 18 :
1443 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1444 18 :
1445 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1446 18 :
1447 18 : Ok(())
1448 18 : }
1449 :
1450 1412 : fn metadata(&self) -> LayerFileMetadata {
1451 1412 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1452 1412 : }
1453 :
1454 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1455 : ///
1456 : /// Synchronizing with spawned tasks is very complicated otherwise.
1457 30 : fn spawn<F>(fut: F)
1458 30 : where
1459 30 : F: std::future::Future<Output = ()> + Send + 'static,
1460 30 : {
1461 30 : #[cfg(test)]
1462 30 : tokio::task::spawn(fut);
1463 30 : #[cfg(not(test))]
1464 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1465 30 : }
1466 :
1467 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1468 468 : fn spawn_blocking<F>(f: F)
1469 468 : where
1470 468 : F: FnOnce() + Send + 'static,
1471 468 : {
1472 468 : #[cfg(test)]
1473 468 : tokio::task::spawn_blocking(f);
1474 468 : #[cfg(not(test))]
1475 468 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1476 468 : }
1477 : }
1478 :
1479 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1480 18 : let m = path.metadata()?;
1481 18 : let local_layer_mtime = m.modified()?;
1482 18 : std::fs::remove_file(path)?;
1483 18 : Ok(local_layer_mtime)
1484 18 : }
1485 :
1486 0 : #[derive(Debug, thiserror::Error)]
1487 : pub(crate) enum EvictionError {
1488 : #[error("layer was already evicted")]
1489 : NotFound,
1490 :
1491 : /// Evictions must always lose to downloads in races, and this time it happened.
1492 : #[error("layer was downloaded instead")]
1493 : Downloaded,
1494 :
1495 : #[error("eviction did not happen within timeout")]
1496 : Timeout,
1497 : }
1498 :
1499 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1500 0 : #[derive(Debug, thiserror::Error)]
1501 : pub(crate) enum DownloadError {
1502 : #[error("timeline has already shutdown")]
1503 : TimelineShutdown,
1504 : #[error("no remote storage configured")]
1505 : NoRemoteStorage,
1506 : #[error("context denies downloading")]
1507 : ContextAndConfigReallyDeniesDownloads,
1508 : #[error("downloading is really required but not allowed by this method")]
1509 : DownloadRequired,
1510 : #[error("layer path exists, but it is not a file: {0:?}")]
1511 : NotFile(std::fs::FileType),
1512 : /// Why no error here? Because it will be reported by page_service. We should had also done
1513 : /// retries already.
1514 : #[error("downloading evicted layer file failed")]
1515 : DownloadFailed,
1516 : #[error("downloading failed, possibly for shutdown")]
1517 : DownloadCancelled,
1518 : #[error("pre-condition: stat before download failed")]
1519 : PreStatFailed(#[source] std::io::Error),
1520 :
1521 : #[cfg(test)]
1522 : #[error("failpoint: {0:?}")]
1523 : Failpoint(failpoints::FailpointKind),
1524 : }
1525 :
1526 : #[derive(Debug, PartialEq)]
1527 : pub(crate) enum NeedsDownload {
1528 : NotFound,
1529 : NotFile(std::fs::FileType),
1530 : WrongSize { actual: u64, expected: u64 },
1531 : }
1532 :
1533 : impl std::fmt::Display for NeedsDownload {
1534 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1535 6 : match self {
1536 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1537 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1538 0 : NeedsDownload::WrongSize { actual, expected } => {
1539 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1540 : }
1541 : }
1542 6 : }
1543 : }
1544 :
1545 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1546 : pub(crate) struct DownloadedLayer {
1547 : owner: Weak<LayerInner>,
1548 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1549 : // DownloadedLayer
1550 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1551 : version: usize,
1552 : }
1553 :
1554 : impl std::fmt::Debug for DownloadedLayer {
1555 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1556 0 : f.debug_struct("DownloadedLayer")
1557 0 : // owner omitted because it is always "Weak"
1558 0 : .field("kind", &self.kind)
1559 0 : .field("version", &self.version)
1560 0 : .finish()
1561 0 : }
1562 : }
1563 :
1564 : impl Drop for DownloadedLayer {
1565 508 : fn drop(&mut self) {
1566 508 : if let Some(owner) = self.owner.upgrade() {
1567 24 : owner.on_downloaded_layer_drop(self.version);
1568 484 : } else {
1569 484 : // Layer::drop will handle cancelling the eviction; because of drop order and
1570 484 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1571 484 : }
1572 508 : }
1573 : }
1574 :
1575 : impl DownloadedLayer {
1576 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
1577 : /// initialize it permanently.
1578 : ///
1579 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1580 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1581 : /// we will always have the LayerInner on the callstack, so we can just use it.
1582 124274 : async fn get<'a>(
1583 124274 : &'a self,
1584 124274 : owner: &Arc<LayerInner>,
1585 124274 : ctx: &RequestContext,
1586 124274 : ) -> anyhow::Result<&'a LayerKind> {
1587 124274 : let init = || async {
1588 630 : assert_eq!(
1589 630 : Weak::as_ptr(&self.owner),
1590 630 : Arc::as_ptr(owner),
1591 0 : "these are the same, just avoiding the upgrade"
1592 : );
1593 :
1594 630 : let res = if owner.desc.is_delta {
1595 606 : let summary = Some(delta_layer::Summary::expected(
1596 606 : owner.desc.tenant_shard_id.tenant_id,
1597 606 : owner.desc.timeline_id,
1598 606 : owner.desc.key_range.clone(),
1599 606 : owner.desc.lsn_range.clone(),
1600 606 : ));
1601 606 : delta_layer::DeltaLayerInner::load(
1602 606 : &owner.path,
1603 606 : summary,
1604 606 : Some(owner.conf.max_vectored_read_bytes),
1605 606 : ctx,
1606 606 : )
1607 610 : .await
1608 606 : .map(|res| res.map(LayerKind::Delta))
1609 : } else {
1610 24 : let lsn = owner.desc.image_layer_lsn();
1611 24 : let summary = Some(image_layer::Summary::expected(
1612 24 : owner.desc.tenant_shard_id.tenant_id,
1613 24 : owner.desc.timeline_id,
1614 24 : owner.desc.key_range.clone(),
1615 24 : lsn,
1616 24 : ));
1617 24 : image_layer::ImageLayerInner::load(
1618 24 : &owner.path,
1619 24 : lsn,
1620 24 : summary,
1621 24 : Some(owner.conf.max_vectored_read_bytes),
1622 24 : ctx,
1623 24 : )
1624 24 : .await
1625 24 : .map(|res| res.map(LayerKind::Image))
1626 : };
1627 :
1628 630 : match res {
1629 630 : Ok(Ok(layer)) => Ok(Ok(layer)),
1630 0 : Ok(Err(transient)) => Err(transient),
1631 0 : Err(permanent) => {
1632 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1633 0 : // TODO(#5815): we are not logging all errors, so temporarily log them **once**
1634 0 : // here as well
1635 0 : let permanent = permanent.context("load layer");
1636 0 : tracing::error!("layer loading failed permanently: {permanent:#}");
1637 0 : Ok(Err(permanent))
1638 : }
1639 : }
1640 1260 : };
1641 124274 : self.kind
1642 124274 : .get_or_try_init(init)
1643 : // return transient errors using `?`
1644 634 : .await?
1645 124274 : .as_ref()
1646 124274 : .map_err(|e| {
1647 0 : // errors are not clonabled, cannot but stringify
1648 0 : // test_broken_timeline matches this string
1649 0 : anyhow::anyhow!("layer loading failed: {e:#}")
1650 124274 : })
1651 124274 : }
1652 :
1653 123768 : async fn get_value_reconstruct_data(
1654 123768 : &self,
1655 123768 : key: Key,
1656 123768 : lsn_range: Range<Lsn>,
1657 123768 : reconstruct_data: &mut ValueReconstructState,
1658 123768 : owner: &Arc<LayerInner>,
1659 123768 : ctx: &RequestContext,
1660 123768 : ) -> anyhow::Result<ValueReconstructResult> {
1661 123768 : use LayerKind::*;
1662 123768 :
1663 123768 : match self.get(owner, ctx).await? {
1664 123242 : Delta(d) => {
1665 123242 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1666 22699 : .await
1667 : }
1668 526 : Image(i) => {
1669 526 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1670 458 : .await
1671 : }
1672 : }
1673 123768 : }
1674 :
1675 18 : async fn get_values_reconstruct_data(
1676 18 : &self,
1677 18 : keyspace: KeySpace,
1678 18 : lsn_range: Range<Lsn>,
1679 18 : reconstruct_data: &mut ValuesReconstructState,
1680 18 : owner: &Arc<LayerInner>,
1681 18 : ctx: &RequestContext,
1682 18 : ) -> Result<(), GetVectoredError> {
1683 18 : use LayerKind::*;
1684 18 :
1685 18 : match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
1686 18 : Delta(d) => {
1687 18 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1688 24 : .await
1689 : }
1690 0 : Image(i) => {
1691 0 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1692 0 : .await
1693 : }
1694 : }
1695 18 : }
1696 :
1697 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1698 4 : use LayerKind::*;
1699 4 : match self.get(owner, ctx).await? {
1700 4 : Delta(d) => d.dump(ctx).await?,
1701 0 : Image(i) => i.dump(ctx).await?,
1702 : }
1703 :
1704 4 : Ok(())
1705 4 : }
1706 : }
1707 :
1708 : /// Wrapper around an actual layer implementation.
1709 : #[derive(Debug)]
1710 : enum LayerKind {
1711 : Delta(delta_layer::DeltaLayerInner),
1712 : Image(image_layer::ImageLayerInner),
1713 : }
1714 :
1715 : /// Guard for forcing a layer be resident while it exists.
1716 : #[derive(Clone)]
1717 : pub(crate) struct ResidentLayer {
1718 : owner: Layer,
1719 : downloaded: Arc<DownloadedLayer>,
1720 : }
1721 :
1722 : impl std::fmt::Display for ResidentLayer {
1723 1398 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1724 1398 : write!(f, "{}", self.owner)
1725 1398 : }
1726 : }
1727 :
1728 : impl std::fmt::Debug for ResidentLayer {
1729 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1730 0 : write!(f, "{}", self.owner)
1731 0 : }
1732 : }
1733 :
1734 : impl ResidentLayer {
1735 : /// Release the eviction guard, converting back into a plain [`Layer`].
1736 : ///
1737 : /// You can access the [`Layer`] also by using `as_ref`.
1738 460 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1739 460 : self.into()
1740 460 : }
1741 :
1742 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1743 1768 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1744 : pub(crate) async fn load_keys<'a>(
1745 : &'a self,
1746 : ctx: &RequestContext,
1747 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1748 : use LayerKind::*;
1749 :
1750 : let owner = &self.owner.0;
1751 :
1752 : match self.downloaded.get(owner, ctx).await? {
1753 : Delta(ref d) => {
1754 : owner
1755 : .access_stats
1756 : .record_access(LayerAccessKind::KeyIter, ctx);
1757 :
1758 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1759 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1760 : // while it's being held.
1761 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1762 : .await
1763 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1764 : }
1765 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1766 : }
1767 : }
1768 :
1769 : /// FIXME: truncate is bad name because we are not truncating anything, but copying the
1770 : /// filtered parts.
1771 : #[cfg(test)]
1772 10 : pub(super) async fn copy_delta_prefix(
1773 10 : &self,
1774 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1775 10 : truncate_at: Lsn,
1776 10 : ctx: &RequestContext,
1777 10 : ) -> anyhow::Result<()> {
1778 10 : use LayerKind::*;
1779 10 :
1780 10 : let owner = &self.owner.0;
1781 10 :
1782 10 : match self.downloaded.get(owner, ctx).await? {
1783 10 : Delta(ref d) => d
1784 10 : .copy_prefix(writer, truncate_at, ctx)
1785 13 : .await
1786 10 : .with_context(|| format!("truncate {self}")),
1787 0 : Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
1788 : }
1789 10 : }
1790 :
1791 851 : pub(crate) fn local_path(&self) -> &Utf8Path {
1792 851 : &self.owner.0.path
1793 851 : }
1794 :
1795 956 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1796 956 : self.owner.metadata()
1797 956 : }
1798 :
1799 : #[cfg(test)]
1800 32 : pub(crate) async fn as_delta(
1801 32 : &self,
1802 32 : ctx: &RequestContext,
1803 32 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1804 32 : use LayerKind::*;
1805 32 : match self.downloaded.get(&self.owner.0, ctx).await? {
1806 32 : Delta(ref d) => Ok(d),
1807 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1808 : }
1809 32 : }
1810 : }
1811 :
1812 : impl AsLayerDesc for ResidentLayer {
1813 2728 : fn layer_desc(&self) -> &PersistentLayerDesc {
1814 2728 : self.owner.layer_desc()
1815 2728 : }
1816 : }
1817 :
1818 : impl AsRef<Layer> for ResidentLayer {
1819 1190 : fn as_ref(&self) -> &Layer {
1820 1190 : &self.owner
1821 1190 : }
1822 : }
1823 :
1824 : /// Drop the eviction guard.
1825 : impl From<ResidentLayer> for Layer {
1826 460 : fn from(value: ResidentLayer) -> Self {
1827 460 : value.owner
1828 460 : }
1829 : }
1830 :
1831 : use metrics::IntCounter;
1832 :
1833 : pub(crate) struct LayerImplMetrics {
1834 : started_evictions: IntCounter,
1835 : completed_evictions: IntCounter,
1836 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1837 :
1838 : started_deletes: IntCounter,
1839 : completed_deletes: IntCounter,
1840 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1841 :
1842 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1843 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1844 : redownload_after: metrics::Histogram,
1845 : time_to_evict: metrics::Histogram,
1846 : }
1847 :
1848 : impl Default for LayerImplMetrics {
1849 28 : fn default() -> Self {
1850 28 : use enum_map::Enum;
1851 28 :
1852 28 : // reminder: these will be pageserver_layer_* with "_total" suffix
1853 28 :
1854 28 : let started_evictions = metrics::register_int_counter!(
1855 28 : "pageserver_layer_started_evictions",
1856 28 : "Evictions started in the Layer implementation"
1857 28 : )
1858 28 : .unwrap();
1859 28 : let completed_evictions = metrics::register_int_counter!(
1860 28 : "pageserver_layer_completed_evictions",
1861 28 : "Evictions completed in the Layer implementation"
1862 28 : )
1863 28 : .unwrap();
1864 28 :
1865 28 : let cancelled_evictions = metrics::register_int_counter_vec!(
1866 28 : "pageserver_layer_cancelled_evictions_count",
1867 28 : "Different reasons for evictions to have been cancelled or failed",
1868 28 : &["reason"]
1869 28 : )
1870 28 : .unwrap();
1871 28 :
1872 252 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1873 252 : let reason = EvictionCancelled::from_usize(i);
1874 252 : let s = reason.as_str();
1875 252 : cancelled_evictions.with_label_values(&[s])
1876 252 : }));
1877 28 :
1878 28 : let started_deletes = metrics::register_int_counter!(
1879 28 : "pageserver_layer_started_deletes",
1880 28 : "Deletions on drop pending in the Layer implementation"
1881 28 : )
1882 28 : .unwrap();
1883 28 : let completed_deletes = metrics::register_int_counter!(
1884 28 : "pageserver_layer_completed_deletes",
1885 28 : "Deletions on drop completed in the Layer implementation"
1886 28 : )
1887 28 : .unwrap();
1888 28 :
1889 28 : let failed_deletes = metrics::register_int_counter_vec!(
1890 28 : "pageserver_layer_failed_deletes_count",
1891 28 : "Different reasons for deletions on drop to have failed",
1892 28 : &["reason"]
1893 28 : )
1894 28 : .unwrap();
1895 28 :
1896 56 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1897 56 : let reason = DeleteFailed::from_usize(i);
1898 56 : let s = reason.as_str();
1899 56 : failed_deletes.with_label_values(&[s])
1900 56 : }));
1901 28 :
1902 28 : let rare_counters = metrics::register_int_counter_vec!(
1903 28 : "pageserver_layer_assumed_rare_count",
1904 28 : "Times unexpected or assumed rare event happened",
1905 28 : &["event"]
1906 28 : )
1907 28 : .unwrap();
1908 28 :
1909 196 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1910 196 : let event = RareEvent::from_usize(i);
1911 196 : let s = event.as_str();
1912 196 : rare_counters.with_label_values(&[s])
1913 196 : }));
1914 28 :
1915 28 : let inits_cancelled = metrics::register_int_counter!(
1916 28 : "pageserver_layer_inits_cancelled_count",
1917 28 : "Times Layer initialization was cancelled",
1918 28 : )
1919 28 : .unwrap();
1920 28 :
1921 28 : let redownload_after = {
1922 28 : let minute = 60.0;
1923 28 : let hour = 60.0 * minute;
1924 28 : metrics::register_histogram!(
1925 28 : "pageserver_layer_redownloaded_after",
1926 28 : "Time between evicting and re-downloading.",
1927 28 : vec![
1928 28 : 10.0,
1929 28 : 30.0,
1930 28 : minute,
1931 28 : 5.0 * minute,
1932 28 : 15.0 * minute,
1933 28 : 30.0 * minute,
1934 28 : hour,
1935 28 : 12.0 * hour,
1936 28 : ]
1937 28 : )
1938 28 : .unwrap()
1939 28 : };
1940 28 :
1941 28 : let time_to_evict = metrics::register_histogram!(
1942 28 : "pageserver_layer_eviction_held_permit_seconds",
1943 28 : "Time eviction held the permit.",
1944 28 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
1945 28 : )
1946 28 : .unwrap();
1947 28 :
1948 28 : Self {
1949 28 : started_evictions,
1950 28 : completed_evictions,
1951 28 : cancelled_evictions,
1952 28 :
1953 28 : started_deletes,
1954 28 : completed_deletes,
1955 28 : failed_deletes,
1956 28 :
1957 28 : rare_counters,
1958 28 : inits_cancelled,
1959 28 : redownload_after,
1960 28 : time_to_evict,
1961 28 : }
1962 28 : }
1963 : }
1964 :
1965 : impl LayerImplMetrics {
1966 26 : fn inc_started_evictions(&self) {
1967 26 : self.started_evictions.inc();
1968 26 : }
1969 18 : fn inc_completed_evictions(&self) {
1970 18 : self.completed_evictions.inc();
1971 18 : }
1972 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
1973 8 : self.cancelled_evictions[reason].inc()
1974 8 : }
1975 :
1976 450 : fn inc_started_deletes(&self) {
1977 450 : self.started_deletes.inc();
1978 450 : }
1979 450 : fn inc_completed_deletes(&self) {
1980 450 : self.completed_deletes.inc();
1981 450 : }
1982 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
1983 0 : self.failed_deletes[reason].inc();
1984 0 : }
1985 :
1986 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
1987 : /// attempt regardless of failure to delete local file.
1988 0 : fn inc_delete_removes_failed(&self) {
1989 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
1990 0 : }
1991 :
1992 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
1993 : /// the single caller which can start the download, so use this counter to separte them.
1994 0 : fn inc_init_completed_without_requester(&self) {
1995 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
1996 0 : }
1997 :
1998 : /// Expected rare because cancellations are unexpected, and failures are unexpected
1999 0 : fn inc_download_failed_without_requester(&self) {
2000 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2001 0 : }
2002 :
2003 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2004 : ///
2005 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2006 : /// Option.
2007 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2008 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2009 0 : }
2010 :
2011 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2012 : /// running with remote storage.
2013 6 : fn inc_init_needed_no_download(&self) {
2014 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2015 6 : }
2016 :
2017 : /// Expected rare because all layer files should be readable and good
2018 0 : fn inc_permanent_loading_failures(&self) {
2019 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2020 0 : }
2021 :
2022 0 : fn inc_init_cancelled(&self) {
2023 0 : self.inits_cancelled.inc()
2024 0 : }
2025 :
2026 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2027 6 : self.redownload_after.observe(duration.as_secs_f64())
2028 6 : }
2029 :
2030 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2031 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2032 : /// from other evictions, so this could have noise as well.
2033 1 : fn inc_evicted_with_waiters(&self) {
2034 1 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2035 1 : }
2036 :
2037 : /// Recorded at least initially as the permit is now acquired in async context before
2038 : /// spawn_blocking action.
2039 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2040 18 : self.time_to_evict.observe(duration.as_secs_f64())
2041 18 : }
2042 : }
2043 :
2044 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2045 : enum EvictionCancelled {
2046 : LayerGone,
2047 : TimelineGone,
2048 : VersionCheckFailed,
2049 : FileNotFound,
2050 : RemoveFailed,
2051 : AlreadyReinitialized,
2052 : /// Not evicted because of a pending reinitialization
2053 : LostToDownload,
2054 : /// After eviction, there was a new layer access which cancelled the eviction.
2055 : UpgradedBackOnAccess,
2056 : UnexpectedEvictedState,
2057 : }
2058 :
2059 : impl EvictionCancelled {
2060 252 : fn as_str(&self) -> &'static str {
2061 252 : match self {
2062 28 : EvictionCancelled::LayerGone => "layer_gone",
2063 28 : EvictionCancelled::TimelineGone => "timeline_gone",
2064 28 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2065 28 : EvictionCancelled::FileNotFound => "file_not_found",
2066 28 : EvictionCancelled::RemoveFailed => "remove_failed",
2067 28 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2068 28 : EvictionCancelled::LostToDownload => "lost_to_download",
2069 28 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2070 28 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2071 : }
2072 252 : }
2073 : }
2074 :
2075 : #[derive(enum_map::Enum)]
2076 : enum DeleteFailed {
2077 : TimelineGone,
2078 : DeleteSchedulingFailed,
2079 : }
2080 :
2081 : impl DeleteFailed {
2082 56 : fn as_str(&self) -> &'static str {
2083 56 : match self {
2084 28 : DeleteFailed::TimelineGone => "timeline_gone",
2085 28 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2086 : }
2087 56 : }
2088 : }
2089 :
2090 : #[derive(enum_map::Enum)]
2091 : enum RareEvent {
2092 : RemoveOnDropFailed,
2093 : InitCompletedWithoutRequester,
2094 : DownloadFailedWithoutRequester,
2095 : UpgradedWantedEvicted,
2096 : InitWithoutDownload,
2097 : PermanentLoadingFailure,
2098 : EvictedWithWaiters,
2099 : }
2100 :
2101 : impl RareEvent {
2102 196 : fn as_str(&self) -> &'static str {
2103 196 : use RareEvent::*;
2104 196 :
2105 196 : match self {
2106 28 : RemoveOnDropFailed => "remove_on_drop_failed",
2107 28 : InitCompletedWithoutRequester => "init_completed_without",
2108 28 : DownloadFailedWithoutRequester => "download_failed_without",
2109 28 : UpgradedWantedEvicted => "raced_wanted_evicted",
2110 28 : InitWithoutDownload => "init_needed_no_download",
2111 28 : PermanentLoadingFailure => "permanent_loading_failure",
2112 28 : EvictedWithWaiters => "evicted_with_waiters",
2113 : }
2114 196 : }
2115 : }
2116 :
2117 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2118 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|