Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext};
17 : use crate::repository::Key;
18 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
19 : use crate::task_mgr::TaskKind;
20 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
21 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
22 :
23 : use super::delta_layer::{self, DeltaEntry};
24 : use super::image_layer::{self};
25 : use super::{
26 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
27 : LayerVisibilityHint, PersistentLayerDesc, ValueReconstructResult, ValueReconstructState,
28 : ValuesReconstructState,
29 : };
30 :
31 : use utils::generation::Generation;
32 :
33 : #[cfg(test)]
34 : mod tests;
35 :
36 : #[cfg(test)]
37 : mod failpoints;
38 :
39 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
40 : /// range of LSNs.
41 : ///
42 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
43 : /// layers are used to ingest incoming WAL, and provide fast access to the
44 : /// recent page versions. On-disk layers are stored as files on disk, and are
45 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
46 : /// [`InMemoryLayer`].
47 : ///
48 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
49 : /// A delta layer contains all modifications within a range of LSNs and keys.
50 : /// An image layer is a snapshot of all the data in a key-range, at a single
51 : /// LSN.
52 : ///
53 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
54 : /// general goal, read accesses should always win eviction and eviction should not wait for
55 : /// download.
56 : ///
57 : /// ### State transitions
58 : ///
59 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
60 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
61 : /// right size) or deleted.
62 : ///
63 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
64 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
65 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
66 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
67 : /// cancelling the eviction.
68 : ///
69 : /// ```text
70 : /// +-----------------+ get_or_maybe_download +--------------------------------+
71 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
72 : /// | ENOENT | /->| |
73 : /// +-----------------+ | +--------------------------------+
74 : /// ^ | | ^
75 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
76 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
77 : /// | | | | - re-initialize without download
78 : /// | | evict_and_wait | |
79 : /// +-----------------+ v |
80 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
81 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
82 : /// +-----------------+ +--------------------------------------+
83 : /// ```
84 : ///
85 : /// ### Unsupported
86 : ///
87 : /// - Evicting by the operator deleting files from the filesystem
88 : ///
89 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
90 : #[derive(Clone)]
91 : pub(crate) struct Layer(Arc<LayerInner>);
92 :
93 : impl std::fmt::Display for Layer {
94 1918 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 1918 : write!(
96 1918 : f,
97 1918 : "{}{}",
98 1918 : self.layer_desc().short_id(),
99 1918 : self.0.generation.get_suffix()
100 1918 : )
101 1918 : }
102 : }
103 :
104 : impl std::fmt::Debug for Layer {
105 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 0 : write!(f, "{}", self)
107 0 : }
108 : }
109 :
110 : impl AsLayerDesc for Layer {
111 502559 : fn layer_desc(&self) -> &PersistentLayerDesc {
112 502559 : self.0.layer_desc()
113 502559 : }
114 : }
115 :
116 : impl PartialEq for Layer {
117 4 : fn eq(&self, other: &Self) -> bool {
118 4 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
119 4 : }
120 : }
121 :
122 1646 : pub(crate) fn local_layer_path(
123 1646 : conf: &PageServerConf,
124 1646 : tenant_shard_id: &TenantShardId,
125 1646 : timeline_id: &TimelineId,
126 1646 : layer_file_name: &LayerName,
127 1646 : generation: &Generation,
128 1646 : ) -> Utf8PathBuf {
129 1646 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
130 1646 :
131 1646 : if generation.is_none() {
132 : // Without a generation, we may only use legacy path style
133 0 : timeline_path.join(layer_file_name.to_string())
134 : } else {
135 1646 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
136 : }
137 1646 : }
138 :
139 : impl Layer {
140 : /// Creates a layer value for a file we know to not be resident.
141 0 : pub(crate) fn for_evicted(
142 0 : conf: &'static PageServerConf,
143 0 : timeline: &Arc<Timeline>,
144 0 : file_name: LayerName,
145 0 : metadata: LayerFileMetadata,
146 0 : ) -> Self {
147 0 : let local_path = local_layer_path(
148 0 : conf,
149 0 : &timeline.tenant_shard_id,
150 0 : &timeline.timeline_id,
151 0 : &file_name,
152 0 : &metadata.generation,
153 0 : );
154 0 :
155 0 : let desc = PersistentLayerDesc::from_filename(
156 0 : timeline.tenant_shard_id,
157 0 : timeline.timeline_id,
158 0 : file_name,
159 0 : metadata.file_size,
160 0 : );
161 0 :
162 0 : let owner = Layer(Arc::new(LayerInner::new(
163 0 : conf,
164 0 : timeline,
165 0 : local_path,
166 0 : desc,
167 0 : None,
168 0 : metadata.generation,
169 0 : metadata.shard,
170 0 : )));
171 0 :
172 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
173 :
174 0 : owner
175 0 : }
176 :
177 : /// Creates a Layer value for a file we know to be resident in timeline directory.
178 24 : pub(crate) fn for_resident(
179 24 : conf: &'static PageServerConf,
180 24 : timeline: &Arc<Timeline>,
181 24 : local_path: Utf8PathBuf,
182 24 : file_name: LayerName,
183 24 : metadata: LayerFileMetadata,
184 24 : ) -> ResidentLayer {
185 24 : let desc = PersistentLayerDesc::from_filename(
186 24 : timeline.tenant_shard_id,
187 24 : timeline.timeline_id,
188 24 : file_name,
189 24 : metadata.file_size,
190 24 : );
191 24 :
192 24 : let mut resident = None;
193 24 :
194 24 : let owner = Layer(Arc::new_cyclic(|owner| {
195 24 : let inner = Arc::new(DownloadedLayer {
196 24 : owner: owner.clone(),
197 24 : kind: tokio::sync::OnceCell::default(),
198 24 : version: 0,
199 24 : });
200 24 : resident = Some(inner.clone());
201 24 :
202 24 : LayerInner::new(
203 24 : conf,
204 24 : timeline,
205 24 : local_path,
206 24 : desc,
207 24 : Some(inner),
208 24 : metadata.generation,
209 24 : metadata.shard,
210 24 : )
211 24 : }));
212 24 :
213 24 : let downloaded = resident.expect("just initialized");
214 24 :
215 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
216 :
217 24 : timeline
218 24 : .metrics
219 24 : .resident_physical_size_add(metadata.file_size);
220 24 :
221 24 : ResidentLayer { downloaded, owner }
222 24 : }
223 :
224 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
225 : /// temporary path.
226 1638 : pub(crate) fn finish_creating(
227 1638 : conf: &'static PageServerConf,
228 1638 : timeline: &Arc<Timeline>,
229 1638 : desc: PersistentLayerDesc,
230 1638 : temp_path: &Utf8Path,
231 1638 : ) -> anyhow::Result<ResidentLayer> {
232 1638 : let mut resident = None;
233 1638 :
234 1638 : let owner = Layer(Arc::new_cyclic(|owner| {
235 1638 : let inner = Arc::new(DownloadedLayer {
236 1638 : owner: owner.clone(),
237 1638 : kind: tokio::sync::OnceCell::default(),
238 1638 : version: 0,
239 1638 : });
240 1638 : resident = Some(inner.clone());
241 1638 :
242 1638 : let local_path = local_layer_path(
243 1638 : conf,
244 1638 : &timeline.tenant_shard_id,
245 1638 : &timeline.timeline_id,
246 1638 : &desc.layer_name(),
247 1638 : &timeline.generation,
248 1638 : );
249 1638 :
250 1638 : LayerInner::new(
251 1638 : conf,
252 1638 : timeline,
253 1638 : local_path,
254 1638 : desc,
255 1638 : Some(inner),
256 1638 : timeline.generation,
257 1638 : timeline.get_shard_index(),
258 1638 : )
259 1638 : }));
260 1638 :
261 1638 : let downloaded = resident.expect("just initialized");
262 1638 :
263 1638 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
264 1638 : // TODO: this leaves the temp file in place if the rename fails, risking us running
265 1638 : // out of space. Should we clean it up here or does the calling context deal with this?
266 1638 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
267 1638 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
268 :
269 1638 : Ok(ResidentLayer { downloaded, owner })
270 1638 : }
271 :
272 : /// Requests the layer to be evicted and waits for this to be done.
273 : ///
274 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
275 : ///
276 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
277 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
278 : ///
279 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
280 : /// will happen regardless the future returned by this method completing unless there is a
281 : /// read access before eviction gets to complete.
282 : ///
283 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
284 : /// of download-evict cycle on retry.
285 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
286 49 : self.0.evict_and_wait(timeout).await
287 32 : }
288 :
289 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
290 : /// then.
291 : ///
292 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
293 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
294 : /// the value this is called on gets dropped.
295 : ///
296 : /// This is ensured by both of those methods accepting references to Layer.
297 : ///
298 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
299 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
300 462 : pub(crate) fn delete_on_drop(&self) {
301 462 : self.0.delete_on_drop();
302 462 : }
303 :
304 : /// Return data needed to reconstruct given page at LSN.
305 : ///
306 : /// It is up to the caller to collect more data from the previous layer and
307 : /// perform WAL redo, if necessary.
308 : ///
309 : /// # Cancellation-Safety
310 : ///
311 : /// This method is cancellation-safe.
312 324 : pub(crate) async fn get_value_reconstruct_data(
313 324 : &self,
314 324 : key: Key,
315 324 : lsn_range: Range<Lsn>,
316 324 : reconstruct_data: &mut ValueReconstructState,
317 324 : ctx: &RequestContext,
318 324 : ) -> anyhow::Result<ValueReconstructResult> {
319 : use anyhow::ensure;
320 :
321 324 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
322 324 : self.0.access_stats.record_access(ctx);
323 324 :
324 324 : if self.layer_desc().is_delta {
325 320 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
326 320 : ensure!(self.layer_desc().key_range.contains(&key));
327 : } else {
328 4 : ensure!(self.layer_desc().key_range.contains(&key));
329 4 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
330 4 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
331 : }
332 :
333 324 : layer
334 324 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
335 324 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
336 25 : .await
337 324 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
338 324 : }
339 :
340 211905 : pub(crate) async fn get_values_reconstruct_data(
341 211905 : &self,
342 211905 : keyspace: KeySpace,
343 211905 : lsn_range: Range<Lsn>,
344 211905 : reconstruct_data: &mut ValuesReconstructState,
345 211905 : ctx: &RequestContext,
346 211905 : ) -> Result<(), GetVectoredError> {
347 211905 : let layer = self
348 211905 : .0
349 211905 : .get_or_maybe_download(true, Some(ctx))
350 0 : .await
351 211905 : .map_err(|err| match err {
352 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
353 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
354 211905 : })?;
355 :
356 211905 : self.0.access_stats.record_access(ctx);
357 211905 :
358 211905 : layer
359 211905 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
360 211905 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
361 98423 : .await
362 211905 : .map_err(|err| match err {
363 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
364 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
365 0 : ),
366 0 : err => err,
367 211905 : })
368 211905 : }
369 :
370 : /// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
371 : #[allow(dead_code)]
372 0 : pub(crate) async fn load_key_values(
373 0 : &self,
374 0 : ctx: &RequestContext,
375 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
376 0 : let layer = self
377 0 : .0
378 0 : .get_or_maybe_download(true, Some(ctx))
379 0 : .await
380 0 : .map_err(|err| match err {
381 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
382 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
383 0 : })?;
384 0 : layer.load_key_values(&self.0, ctx).await
385 0 : }
386 :
387 : /// Download the layer if evicted.
388 : ///
389 : /// Will not error when the layer is already downloaded.
390 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
391 0 : self.0.get_or_maybe_download(true, None).await?;
392 0 : Ok(())
393 0 : }
394 :
395 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
396 : /// while the guard exists.
397 : ///
398 : /// Returns None if the layer is currently evicted or becoming evicted.
399 : #[cfg(test)]
400 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
401 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
402 :
403 14 : Some(ResidentLayer {
404 14 : downloaded,
405 14 : owner: self.clone(),
406 14 : })
407 20 : }
408 :
409 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
410 : /// with `EvictionError::NotFound`.
411 : ///
412 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
413 : /// will be unless a read happens soon.
414 44 : pub(crate) fn is_likely_resident(&self) -> bool {
415 44 : self.0
416 44 : .inner
417 44 : .get()
418 44 : .map(|rowe| rowe.is_likely_resident())
419 44 : .unwrap_or(false)
420 44 : }
421 :
422 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
423 486 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
424 486 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
425 :
426 486 : Ok(ResidentLayer {
427 486 : downloaded,
428 486 : owner: self.clone(),
429 486 : })
430 486 : }
431 :
432 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
433 0 : self.0.info(reset)
434 0 : }
435 :
436 2484 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
437 2484 : &self.0.access_stats
438 2484 : }
439 :
440 1640 : pub(crate) fn local_path(&self) -> &Utf8Path {
441 1640 : &self.0.path
442 1640 : }
443 :
444 320 : pub(crate) fn debug_str(&self) -> &Arc<str> {
445 320 : &self.0.debug_str
446 320 : }
447 :
448 1942 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
449 1942 : self.0.metadata()
450 1942 : }
451 :
452 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
453 0 : self.0
454 0 : .timeline
455 0 : .upgrade()
456 0 : .map(|timeline| timeline.timeline_id)
457 0 : }
458 :
459 : /// Traditional debug dumping facility
460 : #[allow(unused)]
461 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
462 4 : self.0.desc.dump();
463 4 :
464 4 : if verbose {
465 : // for now, unconditionally download everything, even if that might not be wanted.
466 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
467 8 : l.dump(&self.0, ctx).await?
468 0 : }
469 :
470 4 : Ok(())
471 4 : }
472 :
473 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
474 : /// deletion scheduling has completed).
475 : ///
476 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
477 : /// separatedly.
478 : #[cfg(any(feature = "testing", test))]
479 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
480 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
481 :
482 2 : async move {
483 : loop {
484 6 : if rx.changed().await.is_err() {
485 2 : break;
486 0 : }
487 : }
488 2 : }
489 2 : }
490 :
491 2484 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
492 2484 : let old_visibility = self.access_stats().set_visibility(visibility.clone());
493 2484 : use LayerVisibilityHint::*;
494 2484 : match (old_visibility, visibility) {
495 : (Visible, Covered) => {
496 : // Subtract this layer's contribution to the visible size metric
497 34 : if let Some(tl) = self.0.timeline.upgrade() {
498 34 : tl.metrics
499 34 : .visible_physical_size_gauge
500 34 : .sub(self.0.desc.file_size)
501 0 : }
502 : }
503 : (Covered, Visible) => {
504 : // Add this layer's contribution to the visible size metric
505 0 : if let Some(tl) = self.0.timeline.upgrade() {
506 0 : tl.metrics
507 0 : .visible_physical_size_gauge
508 0 : .add(self.0.desc.file_size)
509 0 : }
510 : }
511 2450 : (Covered, Covered) | (Visible, Visible) => {
512 2450 : // no change
513 2450 : }
514 : }
515 2484 : }
516 : }
517 :
518 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
519 : ///
520 : /// However when we want something evicted, we cannot evict it right away as there might be current
521 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
522 : /// read with [`Layer::get_value_reconstruct_data`].
523 : ///
524 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
525 : #[derive(Debug)]
526 : enum ResidentOrWantedEvicted {
527 : Resident(Arc<DownloadedLayer>),
528 : WantedEvicted(Weak<DownloadedLayer>, usize),
529 : }
530 :
531 : impl ResidentOrWantedEvicted {
532 : /// Non-mutating access to the a DownloadedLayer, if possible.
533 : ///
534 : /// This is not used on the read path (anything that calls
535 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
536 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
537 : #[cfg(test)]
538 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
539 14 : match self {
540 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
541 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
542 : }
543 14 : }
544 :
545 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
546 : /// reference from `ResidentOrWantedEvicted::get`.
547 38 : fn is_likely_resident(&self) -> bool {
548 38 : match self {
549 32 : ResidentOrWantedEvicted::Resident(_) => true,
550 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
551 : }
552 38 : }
553 :
554 : /// Upgrades any weak to strong if possible.
555 : ///
556 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
557 : /// happened.
558 212723 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
559 212723 : match self {
560 212715 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
561 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
562 0 : Some(strong) => {
563 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
564 0 :
565 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
566 0 :
567 0 : Some((strong, true))
568 : }
569 8 : None => None,
570 : },
571 : }
572 212723 : }
573 :
574 : /// When eviction is first requested, drop down to holding a [`Weak`].
575 : ///
576 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
577 : /// drop the possibly last strong reference outside of the mutex of
578 : /// [`heavier_once_cell::OnceCell`].
579 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
580 30 : match self {
581 26 : ResidentOrWantedEvicted::Resident(strong) => {
582 26 : let weak = Arc::downgrade(strong);
583 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
584 26 : std::mem::swap(self, &mut temp);
585 26 : match temp {
586 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
587 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
588 : }
589 : }
590 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
591 : }
592 30 : }
593 : }
594 :
595 : struct LayerInner {
596 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
597 : /// [`Self::path`].
598 : conf: &'static PageServerConf,
599 :
600 : /// Full path to the file; unclear if this should exist anymore.
601 : path: Utf8PathBuf,
602 :
603 : /// String representation of the layer, used for traversal id.
604 : debug_str: Arc<str>,
605 :
606 : desc: PersistentLayerDesc,
607 :
608 : /// Timeline access is needed for remote timeline client and metrics.
609 : ///
610 : /// There should not be an access to timeline for any reason without entering the
611 : /// [`Timeline::gate`] at the same time.
612 : timeline: Weak<Timeline>,
613 :
614 : access_stats: LayerAccessStats,
615 :
616 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
617 : ///
618 : /// Filesystem changes (download, evict) are only done while holding a permit which the
619 : /// `heavier_once_cell` provides.
620 : ///
621 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
622 : /// possibly read while not holding it.
623 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
624 :
625 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
626 : wanted_deleted: AtomicBool,
627 :
628 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
629 : ///
630 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
631 : /// `ResidentOrWantedEvicted::WantedEvicted`.
632 : version: AtomicUsize,
633 :
634 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
635 : /// starts, or completes.
636 : ///
637 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
638 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
639 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
640 : /// [`ResidentOrWantedEvicted::Resident`] on access.
641 : ///
642 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
643 : status: Option<tokio::sync::watch::Sender<Status>>,
644 :
645 : /// Counter for exponential backoff with the download.
646 : ///
647 : /// This is atomic only for the purposes of having additional data only accessed while holding
648 : /// the InitPermit.
649 : consecutive_failures: AtomicUsize,
650 :
651 : /// The generation of this Layer.
652 : ///
653 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
654 : /// for created layers from [`Timeline::generation`].
655 : generation: Generation,
656 :
657 : /// The shard of this Layer.
658 : ///
659 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
660 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
661 : ///
662 : /// For loaded layers, this may be some other value if the tenant has undergone
663 : /// a shard split since the layer was originally written.
664 : shard: ShardIndex,
665 :
666 : /// When the Layer was last evicted but has not been downloaded since.
667 : ///
668 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
669 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
670 :
671 : #[cfg(test)]
672 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
673 : }
674 :
675 : impl std::fmt::Display for LayerInner {
676 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677 30 : write!(f, "{}", self.layer_desc().short_id())
678 30 : }
679 : }
680 :
681 : impl AsLayerDesc for LayerInner {
682 504859 : fn layer_desc(&self) -> &PersistentLayerDesc {
683 504859 : &self.desc
684 504859 : }
685 : }
686 :
687 : #[derive(Debug, Clone, Copy)]
688 : enum Status {
689 : Resident,
690 : Evicted,
691 : Downloading,
692 : }
693 :
694 : impl Drop for LayerInner {
695 520 : fn drop(&mut self) {
696 : // if there was a pending eviction, mark it cancelled here to balance metrics
697 520 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
698 2 : {
699 2 : // eviction has already been started
700 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
701 2 :
702 2 : // eviction request is intentionally not honored as no one is present to wait for it
703 2 : // and we could be delaying shutdown for nothing.
704 518 : }
705 :
706 520 : if let Some(timeline) = self.timeline.upgrade() {
707 : // Only need to decrement metrics if the timeline still exists: otherwise
708 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
709 504 : if self.desc.is_delta() {
710 466 : timeline.metrics.layer_count_delta.dec();
711 466 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
712 466 : } else {
713 38 : timeline.metrics.layer_count_image.dec();
714 38 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
715 38 : }
716 :
717 504 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
718 498 : timeline
719 498 : .metrics
720 498 : .visible_physical_size_gauge
721 498 : .sub(self.desc.file_size);
722 498 : }
723 16 : }
724 :
725 520 : if !*self.wanted_deleted.get_mut() {
726 66 : return;
727 454 : }
728 :
729 454 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
730 :
731 454 : let path = std::mem::take(&mut self.path);
732 454 : let file_name = self.layer_desc().layer_name();
733 454 : let file_size = self.layer_desc().file_size;
734 454 : let timeline = self.timeline.clone();
735 454 : let meta = self.metadata();
736 454 : let status = self.status.take();
737 454 :
738 454 : Self::spawn_blocking(move || {
739 454 : let _g = span.entered();
740 454 :
741 454 : // carry this until we are finished for [`Layer::wait_drop`] support
742 454 : let _status = status;
743 :
744 454 : let Some(timeline) = timeline.upgrade() else {
745 : // no need to nag that timeline is gone: under normal situation on
746 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
747 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
748 0 : return;
749 : };
750 :
751 454 : let Ok(_guard) = timeline.gate.enter() else {
752 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
753 0 : return;
754 : };
755 :
756 454 : let removed = match std::fs::remove_file(path) {
757 452 : Ok(()) => true,
758 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
759 2 : // until we no longer do detaches by removing all local files before removing the
760 2 : // tenant from the global map, we will always get these errors even if we knew what
761 2 : // is the latest state.
762 2 : //
763 2 : // we currently do not track the latest state, so we'll also end up here on evicted
764 2 : // layers.
765 2 : false
766 : }
767 0 : Err(e) => {
768 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
769 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
770 0 : false
771 : }
772 : };
773 :
774 454 : if removed {
775 452 : timeline.metrics.resident_physical_size_sub(file_size);
776 452 : }
777 454 : let res = timeline
778 454 : .remote_client
779 454 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
780 :
781 454 : if let Err(e) = res {
782 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
783 : // demonstrating this deadlock (without spawn_blocking): stop will drop
784 : // queued items, which will have ResidentLayer's, and those drops would try
785 : // to re-entrantly lock the RemoteTimelineClient inner state.
786 0 : if !timeline.is_active() {
787 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
788 : } else {
789 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
790 : }
791 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
792 454 : } else {
793 454 : LAYER_IMPL_METRICS.inc_completed_deletes();
794 454 : }
795 454 : });
796 520 : }
797 : }
798 :
799 : impl LayerInner {
800 : #[allow(clippy::too_many_arguments)]
801 1662 : fn new(
802 1662 : conf: &'static PageServerConf,
803 1662 : timeline: &Arc<Timeline>,
804 1662 : local_path: Utf8PathBuf,
805 1662 : desc: PersistentLayerDesc,
806 1662 : downloaded: Option<Arc<DownloadedLayer>>,
807 1662 : generation: Generation,
808 1662 : shard: ShardIndex,
809 1662 : ) -> Self {
810 1662 : let (inner, version, init_status) = if let Some(inner) = downloaded {
811 1662 : let version = inner.version;
812 1662 : let resident = ResidentOrWantedEvicted::Resident(inner);
813 1662 : (
814 1662 : heavier_once_cell::OnceCell::new(resident),
815 1662 : version,
816 1662 : Status::Resident,
817 1662 : )
818 : } else {
819 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
820 : };
821 :
822 : // This object acts as a RAII guard on these metrics: increment on construction
823 1662 : if desc.is_delta() {
824 1398 : timeline.metrics.layer_count_delta.inc();
825 1398 : timeline.metrics.layer_size_delta.add(desc.file_size);
826 1398 : } else {
827 264 : timeline.metrics.layer_count_image.inc();
828 264 : timeline.metrics.layer_size_image.add(desc.file_size);
829 264 : }
830 :
831 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
832 1662 : timeline
833 1662 : .metrics
834 1662 : .visible_physical_size_gauge
835 1662 : .add(desc.file_size);
836 1662 :
837 1662 : LayerInner {
838 1662 : conf,
839 1662 : debug_str: {
840 1662 : format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
841 1662 : },
842 1662 : path: local_path,
843 1662 : desc,
844 1662 : timeline: Arc::downgrade(timeline),
845 1662 : access_stats: Default::default(),
846 1662 : wanted_deleted: AtomicBool::new(false),
847 1662 : inner,
848 1662 : version: AtomicUsize::new(version),
849 1662 : status: Some(tokio::sync::watch::channel(init_status).0),
850 1662 : consecutive_failures: AtomicUsize::new(0),
851 1662 : generation,
852 1662 : shard,
853 1662 : last_evicted_at: std::sync::Mutex::default(),
854 1662 : #[cfg(test)]
855 1662 : failpoints: Default::default(),
856 1662 : }
857 1662 : }
858 :
859 462 : fn delete_on_drop(&self) {
860 462 : let res =
861 462 : self.wanted_deleted
862 462 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
863 462 :
864 462 : if res.is_ok() {
865 458 : LAYER_IMPL_METRICS.inc_started_deletes();
866 458 : }
867 462 : }
868 :
869 : /// Cancellation safe, however dropping the future and calling this method again might result
870 : /// in a new attempt to evict OR join the previously started attempt.
871 108 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
872 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
873 : let mut rx = self.status.as_ref().unwrap().subscribe();
874 :
875 : {
876 : let current = rx.borrow_and_update();
877 : match &*current {
878 : Status::Resident => {
879 : // we might get lucky and evict this; continue
880 : }
881 : Status::Evicted | Status::Downloading => {
882 : // it is already evicted
883 : return Err(EvictionError::NotFound);
884 : }
885 : }
886 : }
887 :
888 : let strong = {
889 : match self.inner.get() {
890 : Some(mut either) => either.downgrade(),
891 : None => {
892 : // we already have a scheduled eviction, which just has not gotten to run yet.
893 : // it might still race with a read access, but that could also get cancelled,
894 : // so let's say this is not evictable.
895 : return Err(EvictionError::NotFound);
896 : }
897 : }
898 : };
899 :
900 : if strong.is_some() {
901 : // drop the DownloadedLayer outside of the holding the guard
902 : drop(strong);
903 :
904 : // idea here is that only one evicter should ever get to witness a strong reference,
905 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
906 : // cancelled eviction and signal us, like it currently does.
907 : //
908 : // a second concurrent evict_and_wait will not see a strong reference.
909 : LAYER_IMPL_METRICS.inc_started_evictions();
910 : }
911 :
912 : let changed = rx.changed();
913 : let changed = tokio::time::timeout(timeout, changed).await;
914 :
915 : let Ok(changed) = changed else {
916 : return Err(EvictionError::Timeout);
917 : };
918 :
919 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
920 :
921 : let current = rx.borrow_and_update();
922 :
923 : match &*current {
924 : // the easiest case
925 : Status::Evicted => Ok(()),
926 : // it surely was evicted in between, but then there was a new access now; we can't know
927 : // if it'll succeed so lets just call it evicted
928 : Status::Downloading => Ok(()),
929 : // either the download which was started after eviction completed already, or it was
930 : // never evicted
931 : Status::Resident => Err(EvictionError::Downloaded),
932 : }
933 : }
934 :
935 : /// Cancellation safe.
936 212731 : async fn get_or_maybe_download(
937 212731 : self: &Arc<Self>,
938 212731 : allow_download: bool,
939 212731 : ctx: Option<&RequestContext>,
940 212731 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
941 16 : let (weak, permit) = {
942 : // get_or_init_detached can:
943 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
944 : // - be slow (wait for semaphore permit or closing)
945 212731 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
946 :
947 212731 : let locked = self
948 212731 : .inner
949 212731 : .get_or_init_detached()
950 5 : .await
951 212731 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
952 212731 :
953 212731 : scopeguard::ScopeGuard::into_inner(init_cancelled);
954 :
955 212715 : match locked {
956 : // this path could had been a RwLock::read
957 212715 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
958 0 : Ok(Ok((strong, _))) => {
959 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
960 0 : // previously a `evict_and_wait` was received. this is the only place when we
961 0 : // send out an update without holding the InitPermit.
962 0 : //
963 0 : // note that we also have dropped the Guard; this is fine, because we just made
964 0 : // a state change and are holding a strong reference to be returned.
965 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
966 0 : LAYER_IMPL_METRICS
967 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
968 0 :
969 0 : return Ok(strong);
970 : }
971 8 : Ok(Err(guard)) => {
972 8 : // path to here: we won the eviction, the file should still be on the disk.
973 8 : let (weak, permit) = guard.take_and_deinit();
974 8 : (Some(weak), permit)
975 : }
976 8 : Err(permit) => (None, permit),
977 : }
978 : };
979 :
980 16 : if let Some(weak) = weak {
981 : // only drop the weak after dropping the heavier_once_cell guard
982 8 : assert!(
983 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
984 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
985 : );
986 8 : }
987 :
988 16 : let timeline = self
989 16 : .timeline
990 16 : .upgrade()
991 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
992 :
993 : // count cancellations, which currently remain largely unexpected
994 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
995 :
996 : // check if we really need to be downloaded: this can happen if a read access won the
997 : // semaphore before eviction.
998 : //
999 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
1000 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
1001 16 : let needs_download = self
1002 16 : .needs_download()
1003 17 : .await
1004 16 : .map_err(DownloadError::PreStatFailed);
1005 16 :
1006 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1007 :
1008 16 : let needs_download = needs_download?;
1009 :
1010 16 : let Some(reason) = needs_download else {
1011 : // the file is present locally because eviction has not had a chance to run yet
1012 :
1013 : #[cfg(test)]
1014 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
1015 2 : .await?;
1016 :
1017 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
1018 6 :
1019 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
1020 : };
1021 :
1022 : // we must download; getting cancelled before spawning the download is not an issue as
1023 : // any still running eviction would not find anything to evict.
1024 :
1025 8 : if let NeedsDownload::NotFile(ft) = reason {
1026 0 : return Err(DownloadError::NotFile(ft));
1027 8 : }
1028 :
1029 8 : if let Some(ctx) = ctx {
1030 2 : self.check_expected_download(ctx)?;
1031 6 : }
1032 :
1033 8 : if !allow_download {
1034 : // this is only used from tests, but it is hard to test without the boolean
1035 2 : return Err(DownloadError::DownloadRequired);
1036 6 : }
1037 6 :
1038 6 : let download_ctx = ctx
1039 6 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1040 6 : .unwrap_or(RequestContext::new(
1041 6 : TaskKind::LayerDownload,
1042 6 : DownloadBehavior::Download,
1043 6 : ));
1044 :
1045 6 : async move {
1046 6 : tracing::info!(%reason, "downloading on-demand");
1047 :
1048 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1049 6 : let res = self
1050 6 : .download_init_and_wait(timeline, permit, download_ctx)
1051 10 : .await?;
1052 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1053 6 : Ok(res)
1054 6 : }
1055 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1056 10 : .await
1057 212731 : }
1058 :
1059 : /// Nag or fail per RequestContext policy
1060 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1061 2 : use crate::context::DownloadBehavior::*;
1062 2 : let b = ctx.download_behavior();
1063 2 : match b {
1064 2 : Download => Ok(()),
1065 : Warn | Error => {
1066 0 : tracing::info!(
1067 0 : "unexpectedly on-demand downloading for task kind {:?}",
1068 0 : ctx.task_kind()
1069 : );
1070 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1071 :
1072 0 : let really_error =
1073 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1074 :
1075 0 : if really_error {
1076 : // this check is only probablistic, seems like flakyness footgun
1077 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1078 : } else {
1079 0 : Ok(())
1080 : }
1081 : }
1082 : }
1083 2 : }
1084 :
1085 : /// Actual download, at most one is executed at the time.
1086 6 : async fn download_init_and_wait(
1087 6 : self: &Arc<Self>,
1088 6 : timeline: Arc<Timeline>,
1089 6 : permit: heavier_once_cell::InitPermit,
1090 6 : ctx: RequestContext,
1091 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1092 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1093 6 :
1094 6 : let (tx, rx) = tokio::sync::oneshot::channel();
1095 6 :
1096 6 : let this: Arc<Self> = self.clone();
1097 :
1098 6 : let guard = timeline
1099 6 : .gate
1100 6 : .enter()
1101 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
1102 :
1103 6 : Self::spawn(
1104 6 : async move {
1105 0 : let _guard = guard;
1106 0 :
1107 0 : // now that we have commited to downloading, send out an update to:
1108 0 : // - unhang any pending eviction
1109 0 : // - break out of evict_and_wait
1110 0 : this.status
1111 0 : .as_ref()
1112 0 : .unwrap()
1113 0 : .send_replace(Status::Downloading);
1114 6 :
1115 6 : #[cfg(test)]
1116 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1117 2 : .await
1118 6 : .unwrap();
1119 :
1120 98 : let res = this.download_and_init(timeline, permit, &ctx).await;
1121 :
1122 6 : if let Err(res) = tx.send(res) {
1123 0 : match res {
1124 0 : Ok(_res) => {
1125 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1126 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1127 : }
1128 0 : Err(e) => {
1129 0 : tracing::info!(
1130 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1131 : );
1132 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1133 : }
1134 : }
1135 6 : }
1136 6 : }
1137 6 : .in_current_span(),
1138 6 : );
1139 6 :
1140 10 : match rx.await {
1141 6 : Ok(Ok(res)) => Ok(res),
1142 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1143 0 : Err(DownloadError::DownloadCancelled)
1144 : }
1145 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1146 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1147 : }
1148 6 : }
1149 :
1150 6 : async fn download_and_init(
1151 6 : self: &Arc<LayerInner>,
1152 6 : timeline: Arc<Timeline>,
1153 6 : permit: heavier_once_cell::InitPermit,
1154 6 : ctx: &RequestContext,
1155 6 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1156 6 : let result = timeline
1157 6 : .remote_client
1158 6 : .download_layer_file(
1159 6 : &self.desc.layer_name(),
1160 6 : &self.metadata(),
1161 6 : &self.path,
1162 6 : &timeline.cancel,
1163 6 : ctx,
1164 6 : )
1165 93 : .await;
1166 :
1167 6 : match result {
1168 6 : Ok(size) => {
1169 6 : assert_eq!(size, self.desc.file_size);
1170 :
1171 6 : match self.needs_download().await {
1172 0 : Ok(Some(reason)) => {
1173 0 : // this is really a bug in needs_download or remote timeline client
1174 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1175 : }
1176 6 : Ok(None) => {
1177 6 : // as expected
1178 6 : }
1179 0 : Err(e) => {
1180 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1181 : }
1182 : }
1183 :
1184 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1185 6 : timeline
1186 6 : .metrics
1187 6 : .resident_physical_size_add(self.desc.file_size);
1188 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1189 6 :
1190 6 : let since_last_eviction = self
1191 6 : .last_evicted_at
1192 6 : .lock()
1193 6 : .unwrap()
1194 6 : .take()
1195 6 : .map(|ts| ts.elapsed());
1196 6 : if let Some(since_last_eviction) = since_last_eviction {
1197 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1198 6 : }
1199 :
1200 6 : self.access_stats.record_residence_event();
1201 6 :
1202 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1203 : }
1204 0 : Err(e) => {
1205 0 : let consecutive_failures =
1206 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1207 0 :
1208 0 : if timeline.cancel.is_cancelled() {
1209 : // If we're shutting down, drop out before logging the error
1210 0 : return Err(e);
1211 0 : }
1212 0 :
1213 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1214 :
1215 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1216 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1217 0 : 1.5,
1218 0 : 60.0,
1219 0 : );
1220 0 :
1221 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1222 :
1223 : tokio::select! {
1224 : _ = tokio::time::sleep(backoff) => {},
1225 : _ = timeline.cancel.cancelled() => {},
1226 : };
1227 :
1228 0 : Err(e)
1229 : }
1230 : }
1231 6 : }
1232 :
1233 : /// Initializes the `Self::inner` to a "resident" state.
1234 : ///
1235 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1236 : /// before calling this method.
1237 : ///
1238 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1239 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1240 12 : fn initialize_after_layer_is_on_disk(
1241 12 : self: &Arc<LayerInner>,
1242 12 : permit: heavier_once_cell::InitPermit,
1243 12 : ) -> Arc<DownloadedLayer> {
1244 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1245 12 :
1246 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1247 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1248 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1249 12 :
1250 12 : let res = Arc::new(DownloadedLayer {
1251 12 : owner: Arc::downgrade(self),
1252 12 : kind: tokio::sync::OnceCell::default(),
1253 12 : version: next_version,
1254 12 : });
1255 12 :
1256 12 : let waiters = self.inner.initializer_count();
1257 12 : if waiters > 0 {
1258 0 : tracing::info!(waiters, "completing layer init for other tasks");
1259 12 : }
1260 :
1261 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1262 12 :
1263 12 : self.inner.set(value, permit);
1264 12 :
1265 12 : res
1266 12 : }
1267 :
1268 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1269 24 : match tokio::fs::metadata(&self.path).await {
1270 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1271 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1272 0 : Err(e) => Err(e),
1273 : }
1274 24 : }
1275 :
1276 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1277 24 : match self.path.metadata() {
1278 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1279 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1280 0 : Err(e) => Err(e),
1281 : }
1282 24 : }
1283 :
1284 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1285 40 : // in future, this should include sha2-256 validation of the file.
1286 40 : if !m.is_file() {
1287 0 : Err(NeedsDownload::NotFile(m.file_type()))
1288 40 : } else if m.len() != self.desc.file_size {
1289 0 : Err(NeedsDownload::WrongSize {
1290 0 : actual: m.len(),
1291 0 : expected: self.desc.file_size,
1292 0 : })
1293 : } else {
1294 40 : Ok(())
1295 : }
1296 40 : }
1297 :
1298 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1299 0 : let layer_name = self.desc.layer_name().to_string();
1300 0 :
1301 0 : let resident = self
1302 0 : .inner
1303 0 : .get()
1304 0 : .map(|rowe| rowe.is_likely_resident())
1305 0 : .unwrap_or(false);
1306 0 :
1307 0 : let access_stats = self.access_stats.as_api_model(reset);
1308 0 :
1309 0 : if self.desc.is_delta {
1310 0 : let lsn_range = &self.desc.lsn_range;
1311 0 :
1312 0 : HistoricLayerInfo::Delta {
1313 0 : layer_file_name: layer_name,
1314 0 : layer_file_size: self.desc.file_size,
1315 0 : lsn_start: lsn_range.start,
1316 0 : lsn_end: lsn_range.end,
1317 0 : remote: !resident,
1318 0 : access_stats,
1319 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(&self.layer_desc().key_range),
1320 0 : }
1321 : } else {
1322 0 : let lsn = self.desc.image_layer_lsn();
1323 0 :
1324 0 : HistoricLayerInfo::Image {
1325 0 : layer_file_name: layer_name,
1326 0 : layer_file_size: self.desc.file_size,
1327 0 : lsn_start: lsn,
1328 0 : remote: !resident,
1329 0 : access_stats,
1330 0 : }
1331 : }
1332 0 : }
1333 :
1334 : /// `DownloadedLayer` is being dropped, so it calls this method.
1335 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1336 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1337 : // though here it is very likely
1338 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1339 :
1340 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1341 : // drop while the `self.inner` is being locked, leading to a deadlock.
1342 :
1343 24 : let start_evicting = async move {
1344 24 : #[cfg(test)]
1345 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1346 14 : .await
1347 24 : .expect("failpoint should not have errored");
1348 24 :
1349 24 : tracing::debug!("eviction started");
1350 :
1351 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1352 : // metrics: ignore the Ok branch, it is not done yet
1353 24 : if let Err(e) = res {
1354 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1355 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1356 18 : }
1357 24 : };
1358 :
1359 24 : Self::spawn(start_evicting.instrument(span));
1360 24 : }
1361 :
1362 24 : async fn wait_for_turn_and_evict(
1363 24 : self: Arc<LayerInner>,
1364 24 : only_version: usize,
1365 24 : ) -> Result<(), EvictionCancelled> {
1366 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1367 46 : use Status::*;
1368 46 : match status {
1369 44 : Resident => Ok(()),
1370 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1371 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1372 : }
1373 46 : }
1374 :
1375 24 : let timeline = self
1376 24 : .timeline
1377 24 : .upgrade()
1378 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1379 :
1380 24 : let mut rx = self
1381 24 : .status
1382 24 : .as_ref()
1383 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1384 24 : .subscribe();
1385 24 :
1386 24 : is_good_to_continue(&rx.borrow_and_update())?;
1387 :
1388 22 : let Ok(gate) = timeline.gate.enter() else {
1389 0 : return Err(EvictionCancelled::TimelineGone);
1390 : };
1391 :
1392 18 : let permit = {
1393 : // we cannot just `std::fs::remove_file` because there might already be an
1394 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1395 : // operations must be done while holding the heavier_once_cell::InitPermit
1396 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1397 :
1398 22 : let waited = loop {
1399 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1400 22 : // completion of the download. waiting for download could be long and hinder our
1401 22 : // efforts to alert on "hanging" evictions.
1402 22 : tokio::select! {
1403 : res = &mut wait => break res,
1404 : _ = rx.changed() => {
1405 : is_good_to_continue(&rx.borrow_and_update())?;
1406 : // two possibilities for Status::Resident:
1407 : // - the layer was found locally from disk by a read
1408 : // - we missed a bunch of updates and now the layer is
1409 : // again downloaded -- assume we'll fail later on with
1410 : // version check or AlreadyReinitialized
1411 : }
1412 22 : }
1413 22 : };
1414 :
1415 : // re-check now that we have the guard or permit; all updates should have happened
1416 : // while holding the permit.
1417 22 : is_good_to_continue(&rx.borrow_and_update())?;
1418 :
1419 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1420 : // lead to deallocating the reference counted value, and the value we
1421 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1422 22 : let (_weak, permit) = match waited {
1423 20 : Ok(guard) => {
1424 20 : match &*guard {
1425 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1426 18 : if *version == only_version =>
1427 16 : {
1428 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1429 16 : let (weak, permit) = guard.take_and_deinit();
1430 16 : (Some(weak), permit)
1431 : }
1432 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1433 2 : // if we were not doing the version check, we would need to try to
1434 2 : // upgrade the weak here to see if it really is dropped. version check
1435 2 : // is done instead assuming that it is cheaper.
1436 2 : tracing::debug!(
1437 : version,
1438 : only_version,
1439 0 : "version mismatch, not deinitializing"
1440 : );
1441 2 : return Err(EvictionCancelled::VersionCheckFailed);
1442 : }
1443 : ResidentOrWantedEvicted::Resident(_) => {
1444 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1445 : }
1446 : }
1447 : }
1448 2 : Err(permit) => {
1449 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1450 2 : (None, permit)
1451 : }
1452 : };
1453 :
1454 18 : permit
1455 18 : };
1456 18 :
1457 18 : let span = tracing::Span::current();
1458 18 :
1459 18 : let spawned_at = std::time::Instant::now();
1460 18 :
1461 18 : // this is on purpose a detached spawn; we don't need to wait for it
1462 18 : //
1463 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1464 18 : // well from a spawn_blocking thread.
1465 18 : //
1466 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1467 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1468 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1469 18 : // evicting.
1470 18 : //
1471 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1472 18 : // reads. We will need to add cancellation for that if necessary.
1473 18 : Self::spawn_blocking(move || {
1474 18 : let _span = span.entered();
1475 18 :
1476 18 : let res = self.evict_blocking(&timeline, &gate, &permit);
1477 18 :
1478 18 : let waiters = self.inner.initializer_count();
1479 18 :
1480 18 : if waiters > 0 {
1481 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1482 18 : }
1483 :
1484 18 : let completed_in = spawned_at.elapsed();
1485 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1486 18 :
1487 18 : match res {
1488 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1489 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1490 : }
1491 :
1492 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1493 18 : });
1494 18 :
1495 18 : Ok(())
1496 24 : }
1497 :
1498 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1499 18 : fn evict_blocking(
1500 18 : &self,
1501 18 : timeline: &Timeline,
1502 18 : _gate: &gate::GateGuard,
1503 18 : _permit: &heavier_once_cell::InitPermit,
1504 18 : ) -> Result<(), EvictionCancelled> {
1505 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1506 18 :
1507 18 : match capture_mtime_and_remove(&self.path) {
1508 18 : Ok(local_layer_mtime) => {
1509 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1510 18 : match duration {
1511 18 : Ok(elapsed) => {
1512 18 : let accessed = self.access_stats.accessed();
1513 18 : if accessed {
1514 4 : // Only layers used for reads contribute to our "low residence" metric that is used
1515 4 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1516 4 : // to be rapidly evicted without contributing to this metric.
1517 4 : timeline
1518 4 : .metrics
1519 4 : .evictions_with_low_residence_duration
1520 4 : .read()
1521 4 : .unwrap()
1522 4 : .observe(elapsed);
1523 14 : }
1524 :
1525 18 : tracing::info!(
1526 0 : residence_millis = elapsed.as_millis(),
1527 0 : accessed,
1528 0 : "evicted layer after known residence period"
1529 : );
1530 : }
1531 : Err(_) => {
1532 0 : tracing::info!("evicted layer after unknown residence period");
1533 : }
1534 : }
1535 18 : timeline.metrics.evictions.inc();
1536 18 : timeline
1537 18 : .metrics
1538 18 : .resident_physical_size_sub(self.desc.file_size);
1539 : }
1540 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1541 0 : tracing::error!(
1542 : layer_size = %self.desc.file_size,
1543 0 : "failed to evict layer from disk, it was already gone"
1544 : );
1545 0 : return Err(EvictionCancelled::FileNotFound);
1546 : }
1547 0 : Err(e) => {
1548 0 : // FIXME: this should probably be an abort
1549 0 : tracing::error!("failed to evict file from disk: {e:#}");
1550 0 : return Err(EvictionCancelled::RemoveFailed);
1551 : }
1552 : }
1553 :
1554 18 : self.access_stats.record_residence_event();
1555 18 :
1556 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1557 18 :
1558 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1559 18 :
1560 18 : Ok(())
1561 18 : }
1562 :
1563 2402 : fn metadata(&self) -> LayerFileMetadata {
1564 2402 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1565 2402 : }
1566 :
1567 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1568 : ///
1569 : /// Synchronizing with spawned tasks is very complicated otherwise.
1570 30 : fn spawn<F>(fut: F)
1571 30 : where
1572 30 : F: std::future::Future<Output = ()> + Send + 'static,
1573 30 : {
1574 30 : #[cfg(test)]
1575 30 : tokio::task::spawn(fut);
1576 30 : #[cfg(not(test))]
1577 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1578 30 : }
1579 :
1580 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1581 472 : fn spawn_blocking<F>(f: F)
1582 472 : where
1583 472 : F: FnOnce() + Send + 'static,
1584 472 : {
1585 472 : #[cfg(test)]
1586 472 : tokio::task::spawn_blocking(f);
1587 472 : #[cfg(not(test))]
1588 472 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1589 472 : }
1590 : }
1591 :
1592 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1593 18 : let m = path.metadata()?;
1594 18 : let local_layer_mtime = m.modified()?;
1595 18 : std::fs::remove_file(path)?;
1596 18 : Ok(local_layer_mtime)
1597 18 : }
1598 :
1599 0 : #[derive(Debug, thiserror::Error)]
1600 : pub(crate) enum EvictionError {
1601 : #[error("layer was already evicted")]
1602 : NotFound,
1603 :
1604 : /// Evictions must always lose to downloads in races, and this time it happened.
1605 : #[error("layer was downloaded instead")]
1606 : Downloaded,
1607 :
1608 : #[error("eviction did not happen within timeout")]
1609 : Timeout,
1610 : }
1611 :
1612 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1613 0 : #[derive(Debug, thiserror::Error)]
1614 : pub(crate) enum DownloadError {
1615 : #[error("timeline has already shutdown")]
1616 : TimelineShutdown,
1617 : #[error("context denies downloading")]
1618 : ContextAndConfigReallyDeniesDownloads,
1619 : #[error("downloading is really required but not allowed by this method")]
1620 : DownloadRequired,
1621 : #[error("layer path exists, but it is not a file: {0:?}")]
1622 : NotFile(std::fs::FileType),
1623 : /// Why no error here? Because it will be reported by page_service. We should had also done
1624 : /// retries already.
1625 : #[error("downloading evicted layer file failed")]
1626 : DownloadFailed,
1627 : #[error("downloading failed, possibly for shutdown")]
1628 : DownloadCancelled,
1629 : #[error("pre-condition: stat before download failed")]
1630 : PreStatFailed(#[source] std::io::Error),
1631 :
1632 : #[cfg(test)]
1633 : #[error("failpoint: {0:?}")]
1634 : Failpoint(failpoints::FailpointKind),
1635 : }
1636 :
1637 : #[derive(Debug, PartialEq)]
1638 : pub(crate) enum NeedsDownload {
1639 : NotFound,
1640 : NotFile(std::fs::FileType),
1641 : WrongSize { actual: u64, expected: u64 },
1642 : }
1643 :
1644 : impl std::fmt::Display for NeedsDownload {
1645 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1646 6 : match self {
1647 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1648 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1649 0 : NeedsDownload::WrongSize { actual, expected } => {
1650 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1651 : }
1652 : }
1653 6 : }
1654 : }
1655 :
1656 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1657 : pub(crate) struct DownloadedLayer {
1658 : owner: Weak<LayerInner>,
1659 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1660 : // DownloadedLayer
1661 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1662 : version: usize,
1663 : }
1664 :
1665 : impl std::fmt::Debug for DownloadedLayer {
1666 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1667 0 : f.debug_struct("DownloadedLayer")
1668 0 : // owner omitted because it is always "Weak"
1669 0 : .field("kind", &self.kind)
1670 0 : .field("version", &self.version)
1671 0 : .finish()
1672 0 : }
1673 : }
1674 :
1675 : impl Drop for DownloadedLayer {
1676 542 : fn drop(&mut self) {
1677 542 : if let Some(owner) = self.owner.upgrade() {
1678 24 : owner.on_downloaded_layer_drop(self.version);
1679 518 : } else {
1680 518 : // Layer::drop will handle cancelling the eviction; because of drop order and
1681 518 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1682 518 : }
1683 542 : }
1684 : }
1685 :
1686 : impl DownloadedLayer {
1687 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1688 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1689 : /// the initial load failure immediately.
1690 : ///
1691 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1692 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1693 : /// we will always have the LayerInner on the callstack, so we can just use it.
1694 213189 : async fn get<'a>(
1695 213189 : &'a self,
1696 213189 : owner: &Arc<LayerInner>,
1697 213189 : ctx: &RequestContext,
1698 213189 : ) -> anyhow::Result<&'a LayerKind> {
1699 213189 : let init = || async {
1700 1148 : assert_eq!(
1701 1148 : Weak::as_ptr(&self.owner),
1702 1148 : Arc::as_ptr(owner),
1703 1148 : "these are the same, just avoiding the upgrade"
1704 1148 : );
1705 1148 :
1706 1148 : let res = if owner.desc.is_delta {
1707 1148 : let summary = Some(delta_layer::Summary::expected(
1708 1050 : owner.desc.tenant_shard_id.tenant_id,
1709 1050 : owner.desc.timeline_id,
1710 1050 : owner.desc.key_range.clone(),
1711 1050 : owner.desc.lsn_range.clone(),
1712 1050 : ));
1713 1050 : delta_layer::DeltaLayerInner::load(
1714 1050 : &owner.path,
1715 1050 : summary,
1716 1050 : Some(owner.conf.max_vectored_read_bytes),
1717 1050 : ctx,
1718 1050 : )
1719 1148 : .await
1720 1148 : .map(LayerKind::Delta)
1721 1148 : } else {
1722 1148 : let lsn = owner.desc.image_layer_lsn();
1723 98 : let summary = Some(image_layer::Summary::expected(
1724 98 : owner.desc.tenant_shard_id.tenant_id,
1725 98 : owner.desc.timeline_id,
1726 98 : owner.desc.key_range.clone(),
1727 98 : lsn,
1728 98 : ));
1729 98 : image_layer::ImageLayerInner::load(
1730 98 : &owner.path,
1731 98 : lsn,
1732 98 : summary,
1733 98 : Some(owner.conf.max_vectored_read_bytes),
1734 98 : ctx,
1735 98 : )
1736 1148 : .await
1737 1148 : .map(LayerKind::Image)
1738 1148 : };
1739 1148 :
1740 1148 : match res {
1741 1148 : Ok(layer) => Ok(layer),
1742 1148 : Err(err) => {
1743 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1744 0 : // We log this message once over the lifetime of `Self`
1745 0 : // => Ok and good to log backtrace and path here.
1746 0 : tracing::error!(
1747 1148 : "layer load failed, assuming permanent failure: {}: {err:?}",
1748 0 : owner.path
1749 1148 : );
1750 1148 : Err(err)
1751 1148 : }
1752 1148 : }
1753 1148 : };
1754 213189 : self.kind
1755 213189 : .get_or_init(init)
1756 1154 : .await
1757 213189 : .as_ref()
1758 213189 : // We already logged the full backtrace above, once. Don't repeat that here.
1759 213189 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1760 213189 : }
1761 :
1762 324 : async fn get_value_reconstruct_data(
1763 324 : &self,
1764 324 : key: Key,
1765 324 : lsn_range: Range<Lsn>,
1766 324 : reconstruct_data: &mut ValueReconstructState,
1767 324 : owner: &Arc<LayerInner>,
1768 324 : ctx: &RequestContext,
1769 324 : ) -> anyhow::Result<ValueReconstructResult> {
1770 324 : use LayerKind::*;
1771 324 :
1772 324 : match self.get(owner, ctx).await? {
1773 320 : Delta(d) => {
1774 320 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1775 17 : .await
1776 : }
1777 4 : Image(i) => {
1778 4 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1779 4 : .await
1780 : }
1781 : }
1782 324 : }
1783 :
1784 211905 : async fn get_values_reconstruct_data(
1785 211905 : &self,
1786 211905 : keyspace: KeySpace,
1787 211905 : lsn_range: Range<Lsn>,
1788 211905 : reconstruct_data: &mut ValuesReconstructState,
1789 211905 : owner: &Arc<LayerInner>,
1790 211905 : ctx: &RequestContext,
1791 211905 : ) -> Result<(), GetVectoredError> {
1792 211905 : use LayerKind::*;
1793 211905 :
1794 211905 : match self
1795 211905 : .get(owner, ctx)
1796 860 : .await
1797 211905 : .map_err(GetVectoredError::Other)?
1798 : {
1799 204299 : Delta(d) => {
1800 204299 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1801 92200 : .await
1802 : }
1803 7606 : Image(i) => {
1804 7606 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1805 5363 : .await
1806 : }
1807 : }
1808 211905 : }
1809 :
1810 0 : async fn load_key_values(
1811 0 : &self,
1812 0 : owner: &Arc<LayerInner>,
1813 0 : ctx: &RequestContext,
1814 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
1815 0 : use LayerKind::*;
1816 0 :
1817 0 : match self.get(owner, ctx).await? {
1818 0 : Delta(d) => d.load_key_values(ctx).await,
1819 0 : Image(i) => i.load_key_values(ctx).await,
1820 : }
1821 0 : }
1822 :
1823 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1824 4 : use LayerKind::*;
1825 4 : match self.get(owner, ctx).await? {
1826 4 : Delta(d) => d.dump(ctx).await?,
1827 0 : Image(i) => i.dump(ctx).await?,
1828 : }
1829 :
1830 4 : Ok(())
1831 4 : }
1832 : }
1833 :
1834 : /// Wrapper around an actual layer implementation.
1835 : #[derive(Debug)]
1836 : enum LayerKind {
1837 : Delta(delta_layer::DeltaLayerInner),
1838 : Image(image_layer::ImageLayerInner),
1839 : }
1840 :
1841 : /// Guard for forcing a layer be resident while it exists.
1842 : #[derive(Clone)]
1843 : pub(crate) struct ResidentLayer {
1844 : owner: Layer,
1845 : downloaded: Arc<DownloadedLayer>,
1846 : }
1847 :
1848 : impl std::fmt::Display for ResidentLayer {
1849 1918 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1850 1918 : write!(f, "{}", self.owner)
1851 1918 : }
1852 : }
1853 :
1854 : impl std::fmt::Debug for ResidentLayer {
1855 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1856 0 : write!(f, "{}", self.owner)
1857 0 : }
1858 : }
1859 :
1860 : impl ResidentLayer {
1861 : /// Release the eviction guard, converting back into a plain [`Layer`].
1862 : ///
1863 : /// You can access the [`Layer`] also by using `as_ref`.
1864 420 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1865 420 : self.into()
1866 420 : }
1867 :
1868 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1869 804 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1870 : pub(crate) async fn load_keys<'a>(
1871 : &'a self,
1872 : ctx: &RequestContext,
1873 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1874 : use LayerKind::*;
1875 :
1876 : let owner = &self.owner.0;
1877 : match self.downloaded.get(owner, ctx).await? {
1878 : Delta(ref d) => {
1879 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1880 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1881 : // while it's being held.
1882 : owner.access_stats.record_access(ctx);
1883 :
1884 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1885 : .await
1886 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1887 : }
1888 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1889 : }
1890 : }
1891 :
1892 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1893 : /// the provided writer. Return the number of keys written.
1894 16 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1895 : pub(crate) async fn filter<'a>(
1896 : &'a self,
1897 : shard_identity: &ShardIdentity,
1898 : writer: &mut ImageLayerWriter,
1899 : ctx: &RequestContext,
1900 : ) -> Result<usize, CompactionError> {
1901 : use LayerKind::*;
1902 :
1903 : match self
1904 : .downloaded
1905 : .get(&self.owner.0, ctx)
1906 : .await
1907 : .map_err(CompactionError::Other)?
1908 : {
1909 : Delta(_) => {
1910 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1911 : "cannot filter() on a delta layer {self}"
1912 : ))));
1913 : }
1914 : Image(i) => i
1915 : .filter(shard_identity, writer, ctx)
1916 : .await
1917 : .map_err(CompactionError::Other),
1918 : }
1919 : }
1920 :
1921 : /// Returns the amount of keys and values written to the writer.
1922 10 : pub(crate) async fn copy_delta_prefix(
1923 10 : &self,
1924 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1925 10 : until: Lsn,
1926 10 : ctx: &RequestContext,
1927 10 : ) -> anyhow::Result<usize> {
1928 10 : use LayerKind::*;
1929 10 :
1930 10 : let owner = &self.owner.0;
1931 10 :
1932 10 : match self.downloaded.get(owner, ctx).await? {
1933 10 : Delta(ref d) => d
1934 10 : .copy_prefix(writer, until, ctx)
1935 13 : .await
1936 10 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1937 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1938 : }
1939 10 : }
1940 :
1941 1625 : pub(crate) fn local_path(&self) -> &Utf8Path {
1942 1625 : &self.owner.0.path
1943 1625 : }
1944 :
1945 1536 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1946 1536 : self.owner.metadata()
1947 1536 : }
1948 :
1949 : /// Cast the layer to a delta, return an error if it is an image layer.
1950 512 : pub(crate) async fn get_as_delta(
1951 512 : &self,
1952 512 : ctx: &RequestContext,
1953 512 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1954 512 : use LayerKind::*;
1955 512 : match self.downloaded.get(&self.owner.0, ctx).await? {
1956 512 : Delta(ref d) => Ok(d),
1957 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1958 : }
1959 512 : }
1960 :
1961 : /// Cast the layer to an image, return an error if it is a delta layer.
1962 24 : pub(crate) async fn get_as_image(
1963 24 : &self,
1964 24 : ctx: &RequestContext,
1965 24 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1966 24 : use LayerKind::*;
1967 24 : match self.downloaded.get(&self.owner.0, ctx).await? {
1968 24 : Image(ref d) => Ok(d),
1969 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1970 : }
1971 24 : }
1972 : }
1973 :
1974 : impl AsLayerDesc for ResidentLayer {
1975 5455 : fn layer_desc(&self) -> &PersistentLayerDesc {
1976 5455 : self.owner.layer_desc()
1977 5455 : }
1978 : }
1979 :
1980 : impl AsRef<Layer> for ResidentLayer {
1981 1904 : fn as_ref(&self) -> &Layer {
1982 1904 : &self.owner
1983 1904 : }
1984 : }
1985 :
1986 : /// Drop the eviction guard.
1987 : impl From<ResidentLayer> for Layer {
1988 420 : fn from(value: ResidentLayer) -> Self {
1989 420 : value.owner
1990 420 : }
1991 : }
1992 :
1993 : use metrics::IntCounter;
1994 :
1995 : pub(crate) struct LayerImplMetrics {
1996 : started_evictions: IntCounter,
1997 : completed_evictions: IntCounter,
1998 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1999 :
2000 : started_deletes: IntCounter,
2001 : completed_deletes: IntCounter,
2002 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
2003 :
2004 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
2005 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
2006 : redownload_after: metrics::Histogram,
2007 : time_to_evict: metrics::Histogram,
2008 : }
2009 :
2010 : impl Default for LayerImplMetrics {
2011 40 : fn default() -> Self {
2012 40 : use enum_map::Enum;
2013 40 :
2014 40 : // reminder: these will be pageserver_layer_* with "_total" suffix
2015 40 :
2016 40 : let started_evictions = metrics::register_int_counter!(
2017 : "pageserver_layer_started_evictions",
2018 : "Evictions started in the Layer implementation"
2019 : )
2020 40 : .unwrap();
2021 40 : let completed_evictions = metrics::register_int_counter!(
2022 : "pageserver_layer_completed_evictions",
2023 : "Evictions completed in the Layer implementation"
2024 : )
2025 40 : .unwrap();
2026 40 :
2027 40 : let cancelled_evictions = metrics::register_int_counter_vec!(
2028 : "pageserver_layer_cancelled_evictions_count",
2029 : "Different reasons for evictions to have been cancelled or failed",
2030 : &["reason"]
2031 : )
2032 40 : .unwrap();
2033 40 :
2034 360 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2035 360 : let reason = EvictionCancelled::from_usize(i);
2036 360 : let s = reason.as_str();
2037 360 : cancelled_evictions.with_label_values(&[s])
2038 360 : }));
2039 40 :
2040 40 : let started_deletes = metrics::register_int_counter!(
2041 : "pageserver_layer_started_deletes",
2042 : "Deletions on drop pending in the Layer implementation"
2043 : )
2044 40 : .unwrap();
2045 40 : let completed_deletes = metrics::register_int_counter!(
2046 : "pageserver_layer_completed_deletes",
2047 : "Deletions on drop completed in the Layer implementation"
2048 : )
2049 40 : .unwrap();
2050 40 :
2051 40 : let failed_deletes = metrics::register_int_counter_vec!(
2052 : "pageserver_layer_failed_deletes_count",
2053 : "Different reasons for deletions on drop to have failed",
2054 : &["reason"]
2055 : )
2056 40 : .unwrap();
2057 40 :
2058 80 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2059 80 : let reason = DeleteFailed::from_usize(i);
2060 80 : let s = reason.as_str();
2061 80 : failed_deletes.with_label_values(&[s])
2062 80 : }));
2063 40 :
2064 40 : let rare_counters = metrics::register_int_counter_vec!(
2065 : "pageserver_layer_assumed_rare_count",
2066 : "Times unexpected or assumed rare event happened",
2067 : &["event"]
2068 : )
2069 40 : .unwrap();
2070 40 :
2071 280 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2072 280 : let event = RareEvent::from_usize(i);
2073 280 : let s = event.as_str();
2074 280 : rare_counters.with_label_values(&[s])
2075 280 : }));
2076 40 :
2077 40 : let inits_cancelled = metrics::register_int_counter!(
2078 : "pageserver_layer_inits_cancelled_count",
2079 : "Times Layer initialization was cancelled",
2080 : )
2081 40 : .unwrap();
2082 40 :
2083 40 : let redownload_after = {
2084 40 : let minute = 60.0;
2085 40 : let hour = 60.0 * minute;
2086 : metrics::register_histogram!(
2087 : "pageserver_layer_redownloaded_after",
2088 : "Time between evicting and re-downloading.",
2089 : vec![
2090 : 10.0,
2091 : 30.0,
2092 : minute,
2093 : 5.0 * minute,
2094 : 15.0 * minute,
2095 : 30.0 * minute,
2096 : hour,
2097 : 12.0 * hour,
2098 : ]
2099 : )
2100 40 : .unwrap()
2101 40 : };
2102 40 :
2103 40 : let time_to_evict = metrics::register_histogram!(
2104 : "pageserver_layer_eviction_held_permit_seconds",
2105 : "Time eviction held the permit.",
2106 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2107 : )
2108 40 : .unwrap();
2109 40 :
2110 40 : Self {
2111 40 : started_evictions,
2112 40 : completed_evictions,
2113 40 : cancelled_evictions,
2114 40 :
2115 40 : started_deletes,
2116 40 : completed_deletes,
2117 40 : failed_deletes,
2118 40 :
2119 40 : rare_counters,
2120 40 : inits_cancelled,
2121 40 : redownload_after,
2122 40 : time_to_evict,
2123 40 : }
2124 40 : }
2125 : }
2126 :
2127 : impl LayerImplMetrics {
2128 26 : fn inc_started_evictions(&self) {
2129 26 : self.started_evictions.inc();
2130 26 : }
2131 18 : fn inc_completed_evictions(&self) {
2132 18 : self.completed_evictions.inc();
2133 18 : }
2134 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2135 8 : self.cancelled_evictions[reason].inc()
2136 8 : }
2137 :
2138 458 : fn inc_started_deletes(&self) {
2139 458 : self.started_deletes.inc();
2140 458 : }
2141 454 : fn inc_completed_deletes(&self) {
2142 454 : self.completed_deletes.inc();
2143 454 : }
2144 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2145 0 : self.failed_deletes[reason].inc();
2146 0 : }
2147 :
2148 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2149 : /// attempt regardless of failure to delete local file.
2150 0 : fn inc_delete_removes_failed(&self) {
2151 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2152 0 : }
2153 :
2154 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2155 : /// the single caller which can start the download, so use this counter to separte them.
2156 0 : fn inc_init_completed_without_requester(&self) {
2157 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2158 0 : }
2159 :
2160 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2161 0 : fn inc_download_failed_without_requester(&self) {
2162 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2163 0 : }
2164 :
2165 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2166 : ///
2167 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2168 : /// Option.
2169 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2170 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2171 0 : }
2172 :
2173 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2174 : /// running with remote storage.
2175 6 : fn inc_init_needed_no_download(&self) {
2176 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2177 6 : }
2178 :
2179 : /// Expected rare because all layer files should be readable and good
2180 0 : fn inc_permanent_loading_failures(&self) {
2181 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2182 0 : }
2183 :
2184 0 : fn inc_init_cancelled(&self) {
2185 0 : self.inits_cancelled.inc()
2186 0 : }
2187 :
2188 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2189 6 : self.redownload_after.observe(duration.as_secs_f64())
2190 6 : }
2191 :
2192 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2193 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2194 : /// from other evictions, so this could have noise as well.
2195 0 : fn inc_evicted_with_waiters(&self) {
2196 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2197 0 : }
2198 :
2199 : /// Recorded at least initially as the permit is now acquired in async context before
2200 : /// spawn_blocking action.
2201 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2202 18 : self.time_to_evict.observe(duration.as_secs_f64())
2203 18 : }
2204 : }
2205 :
2206 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2207 : enum EvictionCancelled {
2208 : LayerGone,
2209 : TimelineGone,
2210 : VersionCheckFailed,
2211 : FileNotFound,
2212 : RemoveFailed,
2213 : AlreadyReinitialized,
2214 : /// Not evicted because of a pending reinitialization
2215 : LostToDownload,
2216 : /// After eviction, there was a new layer access which cancelled the eviction.
2217 : UpgradedBackOnAccess,
2218 : UnexpectedEvictedState,
2219 : }
2220 :
2221 : impl EvictionCancelled {
2222 360 : fn as_str(&self) -> &'static str {
2223 360 : match self {
2224 40 : EvictionCancelled::LayerGone => "layer_gone",
2225 40 : EvictionCancelled::TimelineGone => "timeline_gone",
2226 40 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2227 40 : EvictionCancelled::FileNotFound => "file_not_found",
2228 40 : EvictionCancelled::RemoveFailed => "remove_failed",
2229 40 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2230 40 : EvictionCancelled::LostToDownload => "lost_to_download",
2231 40 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2232 40 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2233 : }
2234 360 : }
2235 : }
2236 :
2237 : #[derive(enum_map::Enum)]
2238 : enum DeleteFailed {
2239 : TimelineGone,
2240 : DeleteSchedulingFailed,
2241 : }
2242 :
2243 : impl DeleteFailed {
2244 80 : fn as_str(&self) -> &'static str {
2245 80 : match self {
2246 40 : DeleteFailed::TimelineGone => "timeline_gone",
2247 40 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2248 : }
2249 80 : }
2250 : }
2251 :
2252 : #[derive(enum_map::Enum)]
2253 : enum RareEvent {
2254 : RemoveOnDropFailed,
2255 : InitCompletedWithoutRequester,
2256 : DownloadFailedWithoutRequester,
2257 : UpgradedWantedEvicted,
2258 : InitWithoutDownload,
2259 : PermanentLoadingFailure,
2260 : EvictedWithWaiters,
2261 : }
2262 :
2263 : impl RareEvent {
2264 280 : fn as_str(&self) -> &'static str {
2265 280 : use RareEvent::*;
2266 280 :
2267 280 : match self {
2268 40 : RemoveOnDropFailed => "remove_on_drop_failed",
2269 40 : InitCompletedWithoutRequester => "init_completed_without",
2270 40 : DownloadFailedWithoutRequester => "download_failed_without",
2271 40 : UpgradedWantedEvicted => "raced_wanted_evicted",
2272 40 : InitWithoutDownload => "init_needed_no_download",
2273 40 : PermanentLoadingFailure => "permanent_loading_failure",
2274 40 : EvictedWithWaiters => "evicted_with_waiters",
2275 : }
2276 280 : }
2277 : }
2278 :
2279 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2280 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|