Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
17 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
18 : use crate::task_mgr::TaskKind;
19 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
20 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
21 :
22 : use super::delta_layer::{self};
23 : use super::image_layer::{self};
24 : use super::{
25 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
26 : LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
27 : };
28 :
29 : use utils::generation::Generation;
30 :
31 : #[cfg(test)]
32 : mod tests;
33 :
34 : #[cfg(test)]
35 : mod failpoints;
36 :
37 : pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
38 :
39 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
40 : /// range of LSNs.
41 : ///
42 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
43 : /// layers are used to ingest incoming WAL, and provide fast access to the
44 : /// recent page versions. On-disk layers are stored as files on disk, and are
45 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
46 : /// [`InMemoryLayer`].
47 : ///
48 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
49 : /// A delta layer contains all modifications within a range of LSNs and keys.
50 : /// An image layer is a snapshot of all the data in a key-range, at a single
51 : /// LSN.
52 : ///
53 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
54 : /// general goal, read accesses should always win eviction and eviction should not wait for
55 : /// download.
56 : ///
57 : /// ### State transitions
58 : ///
59 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
60 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
61 : /// right size) or deleted.
62 : ///
63 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
64 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
65 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
66 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
67 : /// cancelling the eviction.
68 : ///
69 : /// ```text
70 : /// +-----------------+ get_or_maybe_download +--------------------------------+
71 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
72 : /// | ENOENT | /->| |
73 : /// +-----------------+ | +--------------------------------+
74 : /// ^ | | ^
75 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
76 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
77 : /// | | | | - re-initialize without download
78 : /// | | evict_and_wait | |
79 : /// +-----------------+ v |
80 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
81 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
82 : /// +-----------------+ +--------------------------------------+
83 : /// ```
84 : ///
85 : /// ### Unsupported
86 : ///
87 : /// - Evicting by the operator deleting files from the filesystem
88 : ///
89 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
90 : #[derive(Clone)]
91 : pub(crate) struct Layer(Arc<LayerInner>);
92 :
93 : impl std::fmt::Display for Layer {
94 4296 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 4296 : write!(
96 4296 : f,
97 4296 : "{}{}",
98 4296 : self.layer_desc().short_id(),
99 4296 : self.0.generation.get_suffix()
100 4296 : )
101 4296 : }
102 : }
103 :
104 : impl std::fmt::Debug for Layer {
105 8 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 8 : write!(f, "{}", self)
107 8 : }
108 : }
109 :
110 : impl AsLayerDesc for Layer {
111 3842501 : fn layer_desc(&self) -> &PersistentLayerDesc {
112 3842501 : self.0.layer_desc()
113 3842501 : }
114 : }
115 :
116 : impl PartialEq for Layer {
117 5 : fn eq(&self, other: &Self) -> bool {
118 5 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
119 5 : }
120 : }
121 :
122 3812 : pub(crate) fn local_layer_path(
123 3812 : conf: &PageServerConf,
124 3812 : tenant_shard_id: &TenantShardId,
125 3812 : timeline_id: &TimelineId,
126 3812 : layer_file_name: &LayerName,
127 3812 : generation: &Generation,
128 3812 : ) -> Utf8PathBuf {
129 3812 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
130 3812 :
131 3812 : if generation.is_none() {
132 : // Without a generation, we may only use legacy path style
133 0 : timeline_path.join(layer_file_name.to_string())
134 : } else {
135 3812 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
136 : }
137 3812 : }
138 :
139 : pub(crate) enum LastEviction {
140 : Never,
141 : At(std::time::Instant),
142 : Evicting,
143 : }
144 :
145 : impl LastEviction {
146 52 : pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
147 52 : match self {
148 0 : LastEviction::Never => false,
149 52 : LastEviction::At(evicted_at) => evicted_at > &timepoint,
150 0 : LastEviction::Evicting => true,
151 : }
152 52 : }
153 : }
154 :
155 : impl Layer {
156 : /// Creates a layer value for a file we know to not be resident.
157 0 : pub(crate) fn for_evicted(
158 0 : conf: &'static PageServerConf,
159 0 : timeline: &Arc<Timeline>,
160 0 : file_name: LayerName,
161 0 : metadata: LayerFileMetadata,
162 0 : ) -> Self {
163 0 : let local_path = local_layer_path(
164 0 : conf,
165 0 : &timeline.tenant_shard_id,
166 0 : &timeline.timeline_id,
167 0 : &file_name,
168 0 : &metadata.generation,
169 0 : );
170 0 :
171 0 : let desc = PersistentLayerDesc::from_filename(
172 0 : timeline.tenant_shard_id,
173 0 : timeline.timeline_id,
174 0 : file_name,
175 0 : metadata.file_size,
176 0 : );
177 0 :
178 0 : let owner = Layer(Arc::new(LayerInner::new(
179 0 : conf,
180 0 : timeline,
181 0 : local_path,
182 0 : desc,
183 0 : None,
184 0 : metadata.generation,
185 0 : metadata.shard,
186 0 : )));
187 0 :
188 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
189 :
190 0 : owner
191 0 : }
192 :
193 : /// Creates a Layer value for a file we know to be resident in timeline directory.
194 244 : pub(crate) fn for_resident(
195 244 : conf: &'static PageServerConf,
196 244 : timeline: &Arc<Timeline>,
197 244 : local_path: Utf8PathBuf,
198 244 : file_name: LayerName,
199 244 : metadata: LayerFileMetadata,
200 244 : ) -> ResidentLayer {
201 244 : let desc = PersistentLayerDesc::from_filename(
202 244 : timeline.tenant_shard_id,
203 244 : timeline.timeline_id,
204 244 : file_name,
205 244 : metadata.file_size,
206 244 : );
207 244 :
208 244 : let mut resident = None;
209 244 :
210 244 : let owner = Layer(Arc::new_cyclic(|owner| {
211 244 : let inner = Arc::new(DownloadedLayer {
212 244 : owner: owner.clone(),
213 244 : kind: tokio::sync::OnceCell::default(),
214 244 : version: 0,
215 244 : });
216 244 : resident = Some(inner.clone());
217 244 :
218 244 : LayerInner::new(
219 244 : conf,
220 244 : timeline,
221 244 : local_path,
222 244 : desc,
223 244 : Some(inner),
224 244 : metadata.generation,
225 244 : metadata.shard,
226 244 : )
227 244 : }));
228 244 :
229 244 : let downloaded = resident.expect("just initialized");
230 244 :
231 244 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
232 :
233 244 : timeline
234 244 : .metrics
235 244 : .resident_physical_size_add(metadata.file_size);
236 244 :
237 244 : ResidentLayer { downloaded, owner }
238 244 : }
239 :
240 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
241 : /// temporary path.
242 3600 : pub(crate) fn finish_creating(
243 3600 : conf: &'static PageServerConf,
244 3600 : timeline: &Arc<Timeline>,
245 3600 : desc: PersistentLayerDesc,
246 3600 : temp_path: &Utf8Path,
247 3600 : ) -> anyhow::Result<ResidentLayer> {
248 3600 : let mut resident = None;
249 3600 :
250 3600 : let owner = Layer(Arc::new_cyclic(|owner| {
251 3600 : let inner = Arc::new(DownloadedLayer {
252 3600 : owner: owner.clone(),
253 3600 : kind: tokio::sync::OnceCell::default(),
254 3600 : version: 0,
255 3600 : });
256 3600 : resident = Some(inner.clone());
257 3600 :
258 3600 : let local_path = local_layer_path(
259 3600 : conf,
260 3600 : &timeline.tenant_shard_id,
261 3600 : &timeline.timeline_id,
262 3600 : &desc.layer_name(),
263 3600 : &timeline.generation,
264 3600 : );
265 3600 :
266 3600 : LayerInner::new(
267 3600 : conf,
268 3600 : timeline,
269 3600 : local_path,
270 3600 : desc,
271 3600 : Some(inner),
272 3600 : timeline.generation,
273 3600 : timeline.get_shard_index(),
274 3600 : )
275 3600 : }));
276 3600 :
277 3600 : let downloaded = resident.expect("just initialized");
278 3600 :
279 3600 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
280 3600 : // TODO: this leaves the temp file in place if the rename fails, risking us running
281 3600 : // out of space. Should we clean it up here or does the calling context deal with this?
282 3600 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
283 3600 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
284 :
285 3600 : Ok(ResidentLayer { downloaded, owner })
286 3600 : }
287 :
288 : /// Requests the layer to be evicted and waits for this to be done.
289 : ///
290 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
291 : ///
292 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
293 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
294 : ///
295 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
296 : /// will happen regardless the future returned by this method completing unless there is a
297 : /// read access before eviction gets to complete.
298 : ///
299 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
300 : /// of download-evict cycle on retry.
301 104 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
302 104 : self.0.evict_and_wait(timeout).await
303 96 : }
304 :
305 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
306 : /// then.
307 : ///
308 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
309 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
310 : /// the value this is called on gets dropped.
311 : ///
312 : /// This is ensured by both of those methods accepting references to Layer.
313 : ///
314 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
315 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
316 1040 : pub(crate) fn delete_on_drop(&self) {
317 1040 : self.0.delete_on_drop();
318 1040 : }
319 :
320 479440 : pub(crate) async fn get_values_reconstruct_data(
321 479440 : &self,
322 479440 : keyspace: KeySpace,
323 479440 : lsn_range: Range<Lsn>,
324 479440 : reconstruct_data: &mut ValuesReconstructState,
325 479440 : ctx: &RequestContext,
326 479440 : ) -> Result<(), GetVectoredError> {
327 479440 : let downloaded = self
328 479440 : .0
329 479440 : .get_or_maybe_download(true, Some(ctx))
330 479440 : .await
331 479440 : .map_err(|err| match err {
332 : DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
333 0 : GetVectoredError::Cancelled
334 : }
335 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
336 479440 : })?;
337 479440 : let this = ResidentLayer {
338 479440 : downloaded: downloaded.clone(),
339 479440 : owner: self.clone(),
340 479440 : };
341 479440 :
342 479440 : self.record_access(ctx);
343 479440 :
344 479440 : downloaded
345 479440 : .get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
346 479440 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
347 479440 : .await
348 479440 : .map_err(|err| match err {
349 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
350 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
351 0 : ),
352 0 : err => err,
353 479440 : })
354 479440 : }
355 :
356 : /// Download the layer if evicted.
357 : ///
358 : /// Will not error when the layer is already downloaded.
359 0 : pub(crate) async fn download(&self) -> Result<(), DownloadError> {
360 0 : self.0.get_or_maybe_download(true, None).await?;
361 0 : Ok(())
362 0 : }
363 :
364 608 : pub(crate) async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
365 608 : self.0.needs_download().await
366 608 : }
367 :
368 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
369 : /// while the guard exists.
370 : ///
371 : /// Returns None if the layer is currently evicted or becoming evicted.
372 40 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
373 40 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
374 :
375 28 : Some(ResidentLayer {
376 28 : downloaded,
377 28 : owner: self.clone(),
378 28 : })
379 40 : }
380 :
381 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
382 : /// with `EvictionError::NotFound`.
383 : ///
384 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
385 : /// will be unless a read happens soon.
386 357 : pub(crate) fn is_likely_resident(&self) -> bool {
387 357 : self.0
388 357 : .inner
389 357 : .get()
390 357 : .map(|rowe| rowe.is_likely_resident())
391 357 : .unwrap_or(false)
392 357 : }
393 :
394 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
395 1148 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
396 1148 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
397 :
398 1148 : Ok(ResidentLayer {
399 1148 : downloaded,
400 1148 : owner: self.clone(),
401 1148 : })
402 1148 : }
403 :
404 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
405 0 : self.0.info(reset)
406 0 : }
407 :
408 68 : pub(crate) fn latest_activity(&self) -> SystemTime {
409 68 : self.0.access_stats.latest_activity()
410 68 : }
411 :
412 184 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
413 184 : self.0.access_stats.visibility()
414 184 : }
415 :
416 3604 : pub(crate) fn local_path(&self) -> &Utf8Path {
417 3604 : &self.0.path
418 3604 : }
419 :
420 5240 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
421 5240 : self.0.metadata()
422 5240 : }
423 :
424 52 : pub(crate) fn last_evicted_at(&self) -> LastEviction {
425 52 : match self.0.last_evicted_at.try_lock() {
426 52 : Ok(lock) => match *lock {
427 0 : None => LastEviction::Never,
428 52 : Some(at) => LastEviction::At(at),
429 : },
430 0 : Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
431 0 : Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
432 : }
433 52 : }
434 :
435 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
436 0 : self.0
437 0 : .timeline
438 0 : .upgrade()
439 0 : .map(|timeline| timeline.timeline_id)
440 0 : }
441 :
442 : /// Traditional debug dumping facility
443 : #[allow(unused)]
444 8 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
445 8 : self.0.desc.dump();
446 8 :
447 8 : if verbose {
448 : // for now, unconditionally download everything, even if that might not be wanted.
449 8 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
450 8 : l.dump(&self.0, ctx).await?
451 0 : }
452 :
453 8 : Ok(())
454 8 : }
455 :
456 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
457 : /// deletion scheduling has completed).
458 : ///
459 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
460 : /// separatedly.
461 : #[cfg(any(feature = "testing", test))]
462 4 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
463 4 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
464 :
465 4 : async move {
466 : loop {
467 4 : if rx.changed().await.is_err() {
468 4 : break;
469 0 : }
470 : }
471 4 : }
472 4 : }
473 :
474 479440 : fn record_access(&self, ctx: &RequestContext) {
475 479440 : if self.0.access_stats.record_access(ctx) {
476 : // Visibility was modified to Visible: maybe log about this
477 0 : match ctx.task_kind() {
478 : TaskKind::CalculateSyntheticSize
479 : | TaskKind::OndemandLogicalSizeCalculation
480 : | TaskKind::GarbageCollector
481 0 : | TaskKind::MgmtRequest => {
482 0 : // This situation is expected in code paths do binary searches of the LSN space to resolve
483 0 : // an LSN to a timestamp, which happens during GC, during GC cutoff calculations in synthetic size,
484 0 : // and on-demand for certain HTTP API requests. On-demand logical size calculation is also included
485 0 : // because it is run as a sub-task of synthetic size.
486 0 : }
487 : _ => {
488 : // In all other contexts, it is unusual to do I/O involving layers which are not visible at
489 : // some branch tip, so we log the fact that we are accessing something that the visibility
490 : // calculation thought should not be visible.
491 : //
492 : // This case is legal in brief time windows: for example an in-flight getpage request can hold on to a layer object
493 : // which was covered by a concurrent compaction.
494 0 : tracing::info!(
495 : layer=%self,
496 0 : "became visible as a result of access",
497 : );
498 : }
499 : }
500 :
501 : // Update the timeline's visible bytes count
502 0 : if let Some(tl) = self.0.timeline.upgrade() {
503 0 : tl.metrics
504 0 : .visible_physical_size_gauge
505 0 : .add(self.0.desc.file_size)
506 0 : }
507 479440 : }
508 479440 : }
509 :
510 708 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
511 708 : let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
512 : use LayerVisibilityHint::*;
513 708 : match (old_visibility, visibility) {
514 : (Visible, Covered) => {
515 : // Subtract this layer's contribution to the visible size metric
516 72 : if let Some(tl) = self.0.timeline.upgrade() {
517 72 : debug_assert!(
518 72 : tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
519 : );
520 72 : tl.metrics
521 72 : .visible_physical_size_gauge
522 72 : .sub(self.0.desc.file_size)
523 0 : }
524 : }
525 : (Covered, Visible) => {
526 : // Add this layer's contribution to the visible size metric
527 0 : if let Some(tl) = self.0.timeline.upgrade() {
528 0 : tl.metrics
529 0 : .visible_physical_size_gauge
530 0 : .add(self.0.desc.file_size)
531 0 : }
532 : }
533 636 : (Covered, Covered) | (Visible, Visible) => {
534 636 : // no change
535 636 : }
536 : }
537 708 : }
538 : }
539 :
540 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
541 : ///
542 : /// However when we want something evicted, we cannot evict it right away as there might be current
543 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
544 : /// read with [`Layer::get_values_reconstruct_data`].
545 : ///
546 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
547 : #[derive(Debug)]
548 : enum ResidentOrWantedEvicted {
549 : Resident(Arc<DownloadedLayer>),
550 : WantedEvicted(Weak<DownloadedLayer>, usize),
551 : }
552 :
553 : impl ResidentOrWantedEvicted {
554 : /// Non-mutating access to the a DownloadedLayer, if possible.
555 : ///
556 : /// This is not used on the read path (anything that calls
557 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
558 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
559 28 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
560 28 : match self {
561 28 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
562 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
563 : }
564 28 : }
565 :
566 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
567 : /// reference from `ResidentOrWantedEvicted::get`.
568 221 : fn is_likely_resident(&self) -> bool {
569 221 : match self {
570 209 : ResidentOrWantedEvicted::Resident(_) => true,
571 12 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
572 : }
573 221 : }
574 :
575 : /// Upgrades any weak to strong if possible.
576 : ///
577 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
578 : /// happened.
579 480588 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
580 480588 : match self {
581 480572 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
582 16 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
583 0 : Some(strong) => {
584 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
585 0 :
586 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
587 0 :
588 0 : Some((strong, true))
589 : }
590 16 : None => None,
591 : },
592 : }
593 480588 : }
594 :
595 : /// When eviction is first requested, drop down to holding a [`Weak`].
596 : ///
597 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
598 : /// drop the possibly last strong reference outside of the mutex of
599 : /// [`heavier_once_cell::OnceCell`].
600 92 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
601 92 : match self {
602 84 : ResidentOrWantedEvicted::Resident(strong) => {
603 84 : let weak = Arc::downgrade(strong);
604 84 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
605 84 : std::mem::swap(self, &mut temp);
606 84 : match temp {
607 84 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
608 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
609 : }
610 : }
611 8 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
612 : }
613 92 : }
614 : }
615 :
616 : struct LayerInner {
617 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
618 : /// [`Self::path`].
619 : conf: &'static PageServerConf,
620 :
621 : /// Full path to the file; unclear if this should exist anymore.
622 : path: Utf8PathBuf,
623 :
624 : desc: PersistentLayerDesc,
625 :
626 : /// Timeline access is needed for remote timeline client and metrics.
627 : ///
628 : /// There should not be an access to timeline for any reason without entering the
629 : /// [`Timeline::gate`] at the same time.
630 : timeline: Weak<Timeline>,
631 :
632 : access_stats: LayerAccessStats,
633 :
634 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
635 : ///
636 : /// Filesystem changes (download, evict) are only done while holding a permit which the
637 : /// `heavier_once_cell` provides.
638 : ///
639 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
640 : /// possibly read while not holding it.
641 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
642 :
643 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
644 : wanted_deleted: AtomicBool,
645 :
646 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
647 : ///
648 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
649 : /// `ResidentOrWantedEvicted::WantedEvicted`.
650 : version: AtomicUsize,
651 :
652 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
653 : /// starts, or completes.
654 : ///
655 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
656 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
657 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
658 : /// [`ResidentOrWantedEvicted::Resident`] on access.
659 : ///
660 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
661 : status: Option<tokio::sync::watch::Sender<Status>>,
662 :
663 : /// Counter for exponential backoff with the download.
664 : ///
665 : /// This is atomic only for the purposes of having additional data only accessed while holding
666 : /// the InitPermit.
667 : consecutive_failures: AtomicUsize,
668 :
669 : /// The generation of this Layer.
670 : ///
671 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
672 : /// for created layers from [`Timeline::generation`].
673 : generation: Generation,
674 :
675 : /// The shard of this Layer.
676 : ///
677 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
678 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
679 : ///
680 : /// For loaded layers, this may be some other value if the tenant has undergone
681 : /// a shard split since the layer was originally written.
682 : shard: ShardIndex,
683 :
684 : /// When the Layer was last evicted but has not been downloaded since.
685 : ///
686 : /// This is used for skipping evicted layers from the previous heatmap (see
687 : /// `[Timeline::generate_heatmap]`) and for updating metrics
688 : /// (see [`LayerImplMetrics::redownload_after`]).
689 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
690 :
691 : #[cfg(test)]
692 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
693 : }
694 :
695 : impl std::fmt::Display for LayerInner {
696 108 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697 108 : write!(f, "{}", self.layer_desc().short_id())
698 108 : }
699 : }
700 :
701 : impl AsLayerDesc for LayerInner {
702 3847719 : fn layer_desc(&self) -> &PersistentLayerDesc {
703 3847719 : &self.desc
704 3847719 : }
705 : }
706 :
707 : #[derive(Debug, Clone, Copy)]
708 : enum Status {
709 : Resident,
710 : Evicted,
711 : Downloading,
712 : }
713 :
714 : impl Drop for LayerInner {
715 1414 : fn drop(&mut self) {
716 1414 : // if there was a pending eviction, mark it cancelled here to balance metrics
717 1414 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
718 4 : {
719 4 : // eviction has already been started
720 4 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
721 4 :
722 4 : // eviction request is intentionally not honored as no one is present to wait for it
723 4 : // and we could be delaying shutdown for nothing.
724 1410 : }
725 :
726 1414 : let timeline = self.timeline.upgrade();
727 :
728 1414 : if let Some(timeline) = timeline.as_ref() {
729 : // Only need to decrement metrics if the timeline still exists: otherwise
730 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
731 1382 : timeline.metrics.dec_layer(&self.desc);
732 :
733 1382 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
734 1382 : debug_assert!(
735 1382 : timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
736 : );
737 1382 : timeline
738 1382 : .metrics
739 1382 : .visible_physical_size_gauge
740 1382 : .sub(self.desc.file_size);
741 0 : }
742 32 : }
743 :
744 1414 : if !*self.wanted_deleted.get_mut() {
745 392 : return;
746 1022 : }
747 :
748 1022 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
749 :
750 1022 : let path = std::mem::take(&mut self.path);
751 1022 : let file_name = self.layer_desc().layer_name();
752 1022 : let file_size = self.layer_desc().file_size;
753 1022 : let meta = self.metadata();
754 1022 : let status = self.status.take();
755 1022 :
756 1022 : Self::spawn_blocking(move || {
757 1003 : let _g = span.entered();
758 1003 :
759 1003 : // carry this until we are finished for [`Layer::wait_drop`] support
760 1003 : let _status = status;
761 :
762 1003 : let Some(timeline) = timeline else {
763 : // no need to nag that timeline is gone: under normal situation on
764 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
765 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
766 0 : return;
767 : };
768 :
769 1003 : let Ok(_guard) = timeline.gate.enter() else {
770 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
771 0 : return;
772 : };
773 :
774 1003 : let removed = match std::fs::remove_file(path) {
775 999 : Ok(()) => true,
776 4 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
777 4 : // until we no longer do detaches by removing all local files before removing the
778 4 : // tenant from the global map, we will always get these errors even if we knew what
779 4 : // is the latest state.
780 4 : //
781 4 : // we currently do not track the latest state, so we'll also end up here on evicted
782 4 : // layers.
783 4 : false
784 : }
785 0 : Err(e) => {
786 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
787 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
788 0 : false
789 : }
790 : };
791 :
792 1003 : if removed {
793 999 : timeline.metrics.resident_physical_size_sub(file_size);
794 999 : }
795 1003 : let res = timeline
796 1003 : .remote_client
797 1003 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
798 :
799 1003 : if let Err(e) = res {
800 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
801 : // demonstrating this deadlock (without spawn_blocking): stop will drop
802 : // queued items, which will have ResidentLayer's, and those drops would try
803 : // to re-entrantly lock the RemoteTimelineClient inner state.
804 4 : if !timeline.is_active() {
805 4 : tracing::info!("scheduling deletion on drop failed: {e:#}");
806 : } else {
807 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
808 : }
809 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
810 999 : } else {
811 999 : LAYER_IMPL_METRICS.inc_completed_deletes();
812 999 : }
813 1022 : });
814 1414 : }
815 : }
816 :
817 : impl LayerInner {
818 : #[allow(clippy::too_many_arguments)]
819 3844 : fn new(
820 3844 : conf: &'static PageServerConf,
821 3844 : timeline: &Arc<Timeline>,
822 3844 : local_path: Utf8PathBuf,
823 3844 : desc: PersistentLayerDesc,
824 3844 : downloaded: Option<Arc<DownloadedLayer>>,
825 3844 : generation: Generation,
826 3844 : shard: ShardIndex,
827 3844 : ) -> Self {
828 3844 : let (inner, version, init_status) = if let Some(inner) = downloaded {
829 3844 : let version = inner.version;
830 3844 : let resident = ResidentOrWantedEvicted::Resident(inner);
831 3844 : (
832 3844 : heavier_once_cell::OnceCell::new(resident),
833 3844 : version,
834 3844 : Status::Resident,
835 3844 : )
836 : } else {
837 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
838 : };
839 :
840 : // This object acts as a RAII guard on these metrics: increment on construction
841 3844 : timeline.metrics.inc_layer(&desc);
842 3844 :
843 3844 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
844 3844 : timeline
845 3844 : .metrics
846 3844 : .visible_physical_size_gauge
847 3844 : .add(desc.file_size);
848 3844 :
849 3844 : LayerInner {
850 3844 : conf,
851 3844 : path: local_path,
852 3844 : desc,
853 3844 : timeline: Arc::downgrade(timeline),
854 3844 : access_stats: Default::default(),
855 3844 : wanted_deleted: AtomicBool::new(false),
856 3844 : inner,
857 3844 : version: AtomicUsize::new(version),
858 3844 : status: Some(tokio::sync::watch::channel(init_status).0),
859 3844 : consecutive_failures: AtomicUsize::new(0),
860 3844 : generation,
861 3844 : shard,
862 3844 : last_evicted_at: std::sync::Mutex::default(),
863 3844 : #[cfg(test)]
864 3844 : failpoints: Default::default(),
865 3844 : }
866 3844 : }
867 :
868 1040 : fn delete_on_drop(&self) {
869 1040 : let res =
870 1040 : self.wanted_deleted
871 1040 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
872 1040 :
873 1040 : if res.is_ok() {
874 1032 : LAYER_IMPL_METRICS.inc_started_deletes();
875 1032 : }
876 1040 : }
877 :
878 : /// Cancellation safe, however dropping the future and calling this method again might result
879 : /// in a new attempt to evict OR join the previously started attempt.
880 104 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
881 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
882 : let mut rx = self.status.as_ref().unwrap().subscribe();
883 :
884 : {
885 : let current = rx.borrow_and_update();
886 : match &*current {
887 : Status::Resident => {
888 : // we might get lucky and evict this; continue
889 : }
890 : Status::Evicted | Status::Downloading => {
891 : // it is already evicted
892 : return Err(EvictionError::NotFound);
893 : }
894 : }
895 : }
896 :
897 : let strong = {
898 : match self.inner.get() {
899 : Some(mut either) => either.downgrade(),
900 : None => {
901 : // we already have a scheduled eviction, which just has not gotten to run yet.
902 : // it might still race with a read access, but that could also get cancelled,
903 : // so let's say this is not evictable.
904 : return Err(EvictionError::NotFound);
905 : }
906 : }
907 : };
908 :
909 : if strong.is_some() {
910 : // drop the DownloadedLayer outside of the holding the guard
911 : drop(strong);
912 :
913 : // idea here is that only one evicter should ever get to witness a strong reference,
914 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
915 : // cancelled eviction and signal us, like it currently does.
916 : //
917 : // a second concurrent evict_and_wait will not see a strong reference.
918 : LAYER_IMPL_METRICS.inc_started_evictions();
919 : }
920 :
921 : let changed = rx.changed();
922 : let changed = tokio::time::timeout(timeout, changed).await;
923 :
924 : let Ok(changed) = changed else {
925 : return Err(EvictionError::Timeout);
926 : };
927 :
928 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
929 :
930 : let current = rx.borrow_and_update();
931 :
932 : match &*current {
933 : // the easiest case
934 : Status::Evicted => Ok(()),
935 : // it surely was evicted in between, but then there was a new access now; we can't know
936 : // if it'll succeed so lets just call it evicted
937 : Status::Downloading => Ok(()),
938 : // either the download which was started after eviction completed already, or it was
939 : // never evicted
940 : Status::Resident => Err(EvictionError::Downloaded),
941 : }
942 : }
943 :
944 : /// Cancellation safe.
945 480620 : async fn get_or_maybe_download(
946 480620 : self: &Arc<Self>,
947 480620 : allow_download: bool,
948 480620 : ctx: Option<&RequestContext>,
949 480620 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
950 48 : let (weak, permit) = {
951 : // get_or_init_detached can:
952 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
953 : // - be slow (wait for semaphore permit or closing)
954 480620 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
955 :
956 480620 : let locked = self
957 480620 : .inner
958 480620 : .get_or_init_detached()
959 480620 : .await
960 480620 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
961 480620 :
962 480620 : scopeguard::ScopeGuard::into_inner(init_cancelled);
963 :
964 480572 : match locked {
965 : // this path could had been a RwLock::read
966 480572 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
967 0 : Ok(Ok((strong, _))) => {
968 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
969 0 : // previously a `evict_and_wait` was received. this is the only place when we
970 0 : // send out an update without holding the InitPermit.
971 0 : //
972 0 : // note that we also have dropped the Guard; this is fine, because we just made
973 0 : // a state change and are holding a strong reference to be returned.
974 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
975 0 : LAYER_IMPL_METRICS
976 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
977 0 :
978 0 : return Ok(strong);
979 : }
980 16 : Ok(Err(guard)) => {
981 16 : // path to here: we won the eviction, the file should still be on the disk.
982 16 : let (weak, permit) = guard.take_and_deinit();
983 16 : (Some(weak), permit)
984 : }
985 32 : Err(permit) => (None, permit),
986 : }
987 : };
988 :
989 48 : if let Some(weak) = weak {
990 : // only drop the weak after dropping the heavier_once_cell guard
991 16 : assert!(
992 16 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
993 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
994 : );
995 32 : }
996 :
997 48 : let timeline = self
998 48 : .timeline
999 48 : .upgrade()
1000 48 : .ok_or(DownloadError::TimelineShutdown)?;
1001 :
1002 : // count cancellations, which currently remain largely unexpected
1003 48 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1004 :
1005 : // check if we really need to be downloaded: this can happen if a read access won the
1006 : // semaphore before eviction.
1007 : //
1008 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
1009 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
1010 48 : let needs_download = self
1011 48 : .needs_download()
1012 48 : .await
1013 48 : .map_err(DownloadError::PreStatFailed);
1014 48 :
1015 48 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1016 :
1017 48 : let needs_download = needs_download?;
1018 :
1019 48 : let Some(reason) = needs_download else {
1020 : // the file is present locally because eviction has not had a chance to run yet
1021 :
1022 : #[cfg(test)]
1023 16 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
1024 16 : .await?;
1025 :
1026 12 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
1027 12 :
1028 12 : return Ok(self.initialize_after_layer_is_on_disk(permit));
1029 : };
1030 :
1031 : // we must download; getting cancelled before spawning the download is not an issue as
1032 : // any still running eviction would not find anything to evict.
1033 :
1034 32 : if let NeedsDownload::NotFile(ft) = reason {
1035 0 : return Err(DownloadError::NotFile(ft));
1036 32 : }
1037 :
1038 32 : if let Some(ctx) = ctx {
1039 4 : self.check_expected_download(ctx)?;
1040 28 : }
1041 :
1042 32 : if !allow_download {
1043 : // this is only used from tests, but it is hard to test without the boolean
1044 4 : return Err(DownloadError::DownloadRequired);
1045 28 : }
1046 28 :
1047 28 : let download_ctx = ctx
1048 28 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1049 28 : .unwrap_or(RequestContext::new(
1050 28 : TaskKind::LayerDownload,
1051 28 : DownloadBehavior::Download,
1052 28 : ));
1053 :
1054 28 : async move {
1055 28 : tracing::info!(%reason, "downloading on-demand");
1056 :
1057 28 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1058 28 : let res = self
1059 28 : .download_init_and_wait(timeline, permit, download_ctx)
1060 28 : .await?;
1061 28 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1062 28 : Ok(res)
1063 28 : }
1064 28 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1065 28 : .await
1066 480620 : }
1067 :
1068 : /// Nag or fail per RequestContext policy
1069 4 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1070 : use crate::context::DownloadBehavior::*;
1071 4 : let b = ctx.download_behavior();
1072 4 : match b {
1073 4 : Download => Ok(()),
1074 : Warn | Error => {
1075 0 : tracing::info!(
1076 0 : "unexpectedly on-demand downloading for task kind {:?}",
1077 0 : ctx.task_kind()
1078 : );
1079 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1080 :
1081 0 : let really_error =
1082 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1083 :
1084 0 : if really_error {
1085 : // this check is only probablistic, seems like flakyness footgun
1086 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1087 : } else {
1088 0 : Ok(())
1089 : }
1090 : }
1091 : }
1092 4 : }
1093 :
1094 : /// Actual download, at most one is executed at the time.
1095 28 : async fn download_init_and_wait(
1096 28 : self: &Arc<Self>,
1097 28 : timeline: Arc<Timeline>,
1098 28 : permit: heavier_once_cell::InitPermit,
1099 28 : ctx: RequestContext,
1100 28 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1101 28 : debug_assert_current_span_has_tenant_and_timeline_id();
1102 28 :
1103 28 : let (tx, rx) = tokio::sync::oneshot::channel();
1104 28 :
1105 28 : let this: Arc<Self> = self.clone();
1106 :
1107 28 : let guard = timeline
1108 28 : .gate
1109 28 : .enter()
1110 28 : .map_err(|_| DownloadError::DownloadCancelled)?;
1111 :
1112 28 : Self::spawn(
1113 28 : async move {
1114 0 : let _guard = guard;
1115 0 :
1116 0 : // now that we have commited to downloading, send out an update to:
1117 0 : // - unhang any pending eviction
1118 0 : // - break out of evict_and_wait
1119 0 : this.status
1120 0 : .as_ref()
1121 0 : .unwrap()
1122 0 : .send_replace(Status::Downloading);
1123 28 :
1124 28 : #[cfg(test)]
1125 28 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1126 28 : .await
1127 28 : .unwrap();
1128 :
1129 28 : let res = this.download_and_init(timeline, permit, &ctx).await;
1130 :
1131 28 : if let Err(res) = tx.send(res) {
1132 0 : match res {
1133 0 : Ok(_res) => {
1134 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1135 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1136 : }
1137 0 : Err(e) => {
1138 0 : tracing::info!(
1139 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1140 : );
1141 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1142 : }
1143 : }
1144 28 : }
1145 28 : }
1146 28 : .in_current_span(),
1147 28 : );
1148 28 :
1149 28 : match rx.await {
1150 28 : Ok(Ok(res)) => Ok(res),
1151 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1152 0 : Err(DownloadError::DownloadCancelled)
1153 : }
1154 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1155 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1156 : }
1157 28 : }
1158 :
1159 28 : async fn download_and_init(
1160 28 : self: &Arc<LayerInner>,
1161 28 : timeline: Arc<Timeline>,
1162 28 : permit: heavier_once_cell::InitPermit,
1163 28 : ctx: &RequestContext,
1164 28 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1165 28 : let result = timeline
1166 28 : .remote_client
1167 28 : .download_layer_file(
1168 28 : &self.desc.layer_name(),
1169 28 : &self.metadata(),
1170 28 : &self.path,
1171 28 : &timeline.gate,
1172 28 : &timeline.cancel,
1173 28 : ctx,
1174 28 : )
1175 28 : .await;
1176 :
1177 28 : match result {
1178 28 : Ok(size) => {
1179 28 : assert_eq!(size, self.desc.file_size);
1180 :
1181 28 : match self.needs_download().await {
1182 0 : Ok(Some(reason)) => {
1183 0 : // this is really a bug in needs_download or remote timeline client
1184 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1185 : }
1186 28 : Ok(None) => {
1187 28 : // as expected
1188 28 : }
1189 0 : Err(e) => {
1190 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1191 : }
1192 : }
1193 :
1194 28 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1195 28 : timeline
1196 28 : .metrics
1197 28 : .resident_physical_size_add(self.desc.file_size);
1198 28 : self.consecutive_failures.store(0, Ordering::Relaxed);
1199 28 :
1200 28 : let since_last_eviction = self
1201 28 : .last_evicted_at
1202 28 : .lock()
1203 28 : .unwrap()
1204 28 : .take()
1205 28 : .map(|ts| ts.elapsed());
1206 28 : if let Some(since_last_eviction) = since_last_eviction {
1207 28 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1208 28 : }
1209 :
1210 28 : self.access_stats.record_residence_event();
1211 28 :
1212 28 : Ok(self.initialize_after_layer_is_on_disk(permit))
1213 : }
1214 0 : Err(e) => {
1215 0 : let consecutive_failures =
1216 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1217 0 :
1218 0 : if timeline.cancel.is_cancelled() {
1219 : // If we're shutting down, drop out before logging the error
1220 0 : return Err(e);
1221 0 : }
1222 0 :
1223 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1224 :
1225 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1226 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1227 0 : 1.5,
1228 0 : 60.0,
1229 0 : );
1230 0 :
1231 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1232 0 :
1233 0 : tokio::select! {
1234 0 : _ = tokio::time::sleep(backoff) => {},
1235 0 : _ = timeline.cancel.cancelled() => {},
1236 : };
1237 :
1238 0 : Err(e)
1239 : }
1240 : }
1241 28 : }
1242 :
1243 : /// Initializes the `Self::inner` to a "resident" state.
1244 : ///
1245 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1246 : /// before calling this method.
1247 : ///
1248 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1249 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1250 40 : fn initialize_after_layer_is_on_disk(
1251 40 : self: &Arc<LayerInner>,
1252 40 : permit: heavier_once_cell::InitPermit,
1253 40 : ) -> Arc<DownloadedLayer> {
1254 40 : debug_assert_current_span_has_tenant_and_timeline_id();
1255 40 :
1256 40 : // disable any scheduled but not yet running eviction deletions for this initialization
1257 40 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1258 40 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1259 40 :
1260 40 : let res = Arc::new(DownloadedLayer {
1261 40 : owner: Arc::downgrade(self),
1262 40 : kind: tokio::sync::OnceCell::default(),
1263 40 : version: next_version,
1264 40 : });
1265 40 :
1266 40 : let waiters = self.inner.initializer_count();
1267 40 : if waiters > 0 {
1268 0 : tracing::info!(waiters, "completing layer init for other tasks");
1269 40 : }
1270 :
1271 40 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1272 40 :
1273 40 : self.inner.set(value, permit);
1274 40 :
1275 40 : res
1276 40 : }
1277 :
1278 688 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1279 688 : match tokio::fs::metadata(&self.path).await {
1280 656 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1281 32 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1282 0 : Err(e) => Err(e),
1283 : }
1284 688 : }
1285 :
1286 244 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1287 244 : match self.path.metadata() {
1288 244 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1289 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1290 0 : Err(e) => Err(e),
1291 : }
1292 244 : }
1293 :
1294 900 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1295 900 : // in future, this should include sha2-256 validation of the file.
1296 900 : if !m.is_file() {
1297 0 : Err(NeedsDownload::NotFile(m.file_type()))
1298 900 : } else if m.len() != self.desc.file_size {
1299 0 : Err(NeedsDownload::WrongSize {
1300 0 : actual: m.len(),
1301 0 : expected: self.desc.file_size,
1302 0 : })
1303 : } else {
1304 900 : Ok(())
1305 : }
1306 900 : }
1307 :
1308 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1309 0 : let layer_name = self.desc.layer_name().to_string();
1310 0 :
1311 0 : let resident = self
1312 0 : .inner
1313 0 : .get()
1314 0 : .map(|rowe| rowe.is_likely_resident())
1315 0 : .unwrap_or(false);
1316 0 :
1317 0 : let access_stats = self.access_stats.as_api_model(reset);
1318 0 :
1319 0 : if self.desc.is_delta {
1320 0 : let lsn_range = &self.desc.lsn_range;
1321 0 :
1322 0 : HistoricLayerInfo::Delta {
1323 0 : layer_file_name: layer_name,
1324 0 : layer_file_size: self.desc.file_size,
1325 0 : lsn_start: lsn_range.start,
1326 0 : lsn_end: lsn_range.end,
1327 0 : remote: !resident,
1328 0 : access_stats,
1329 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(
1330 0 : &self.layer_desc().key_range,
1331 0 : self.layer_desc().is_delta,
1332 0 : ),
1333 0 : }
1334 : } else {
1335 0 : let lsn = self.desc.image_layer_lsn();
1336 0 :
1337 0 : HistoricLayerInfo::Image {
1338 0 : layer_file_name: layer_name,
1339 0 : layer_file_size: self.desc.file_size,
1340 0 : lsn_start: lsn,
1341 0 : remote: !resident,
1342 0 : access_stats,
1343 0 : }
1344 : }
1345 0 : }
1346 :
1347 : /// `DownloadedLayer` is being dropped, so it calls this method.
1348 80 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1349 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1350 : // though here it is very likely
1351 80 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1352 :
1353 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1354 : // drop while the `self.inner` is being locked, leading to a deadlock.
1355 :
1356 80 : let start_evicting = async move {
1357 80 : #[cfg(test)]
1358 80 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1359 80 : .await
1360 80 : .expect("failpoint should not have errored");
1361 80 :
1362 80 : tracing::debug!("eviction started");
1363 :
1364 80 : let res = self.wait_for_turn_and_evict(only_version).await;
1365 : // metrics: ignore the Ok branch, it is not done yet
1366 80 : if let Err(e) = res {
1367 12 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1368 12 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1369 68 : }
1370 80 : };
1371 :
1372 80 : Self::spawn(start_evicting.instrument(span));
1373 80 : }
1374 :
1375 80 : async fn wait_for_turn_and_evict(
1376 80 : self: Arc<LayerInner>,
1377 80 : only_version: usize,
1378 80 : ) -> Result<(), EvictionCancelled> {
1379 156 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1380 : use Status::*;
1381 156 : match status {
1382 152 : Resident => Ok(()),
1383 4 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1384 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1385 : }
1386 156 : }
1387 :
1388 80 : let timeline = self
1389 80 : .timeline
1390 80 : .upgrade()
1391 80 : .ok_or(EvictionCancelled::TimelineGone)?;
1392 :
1393 80 : let mut rx = self
1394 80 : .status
1395 80 : .as_ref()
1396 80 : .expect("LayerInner cannot be dropped, holding strong ref")
1397 80 : .subscribe();
1398 80 :
1399 80 : is_good_to_continue(&rx.borrow_and_update())?;
1400 :
1401 76 : let Ok(gate) = timeline.gate.enter() else {
1402 0 : return Err(EvictionCancelled::TimelineGone);
1403 : };
1404 :
1405 68 : let permit = {
1406 : // we cannot just `std::fs::remove_file` because there might already be an
1407 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1408 : // operations must be done while holding the heavier_once_cell::InitPermit
1409 76 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1410 :
1411 76 : let waited = loop {
1412 : // we must race to the Downloading starting, otherwise we would have to wait until the
1413 : // completion of the download. waiting for download could be long and hinder our
1414 : // efforts to alert on "hanging" evictions.
1415 76 : tokio::select! {
1416 76 : res = &mut wait => break res,
1417 76 : _ = rx.changed() => {
1418 0 : is_good_to_continue(&rx.borrow_and_update())?;
1419 : // two possibilities for Status::Resident:
1420 : // - the layer was found locally from disk by a read
1421 : // - we missed a bunch of updates and now the layer is
1422 : // again downloaded -- assume we'll fail later on with
1423 : // version check or AlreadyReinitialized
1424 : }
1425 : }
1426 : };
1427 :
1428 : // re-check now that we have the guard or permit; all updates should have happened
1429 : // while holding the permit.
1430 76 : is_good_to_continue(&rx.borrow_and_update())?;
1431 :
1432 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1433 : // lead to deallocating the reference counted value, and the value we
1434 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1435 76 : let (_weak, permit) = match waited {
1436 72 : Ok(guard) => {
1437 72 : match &*guard {
1438 68 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1439 68 : if *version == only_version =>
1440 64 : {
1441 64 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1442 64 : let (weak, permit) = guard.take_and_deinit();
1443 64 : (Some(weak), permit)
1444 : }
1445 4 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1446 4 : // if we were not doing the version check, we would need to try to
1447 4 : // upgrade the weak here to see if it really is dropped. version check
1448 4 : // is done instead assuming that it is cheaper.
1449 4 : tracing::debug!(
1450 : version,
1451 : only_version,
1452 0 : "version mismatch, not deinitializing"
1453 : );
1454 4 : return Err(EvictionCancelled::VersionCheckFailed);
1455 : }
1456 : ResidentOrWantedEvicted::Resident(_) => {
1457 4 : return Err(EvictionCancelled::AlreadyReinitialized);
1458 : }
1459 : }
1460 : }
1461 4 : Err(permit) => {
1462 4 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1463 4 : (None, permit)
1464 : }
1465 : };
1466 :
1467 68 : permit
1468 68 : };
1469 68 :
1470 68 : let span = tracing::Span::current();
1471 68 :
1472 68 : let spawned_at = std::time::Instant::now();
1473 68 :
1474 68 : // this is on purpose a detached spawn; we don't need to wait for it
1475 68 : //
1476 68 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1477 68 : // well from a spawn_blocking thread.
1478 68 : //
1479 68 : // important to note that now that we've acquired the permit we have made sure the evicted
1480 68 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1481 68 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1482 68 : // evicting.
1483 68 : //
1484 68 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1485 68 : // reads. We will need to add cancellation for that if necessary.
1486 68 : Self::spawn_blocking(move || {
1487 68 : let _span = span.entered();
1488 68 :
1489 68 : let res = self.evict_blocking(&timeline, &gate, &permit);
1490 68 :
1491 68 : let waiters = self.inner.initializer_count();
1492 68 :
1493 68 : if waiters > 0 {
1494 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1495 68 : }
1496 :
1497 68 : let completed_in = spawned_at.elapsed();
1498 68 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1499 68 :
1500 68 : match res {
1501 68 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1502 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1503 : }
1504 :
1505 68 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1506 68 : });
1507 68 :
1508 68 : Ok(())
1509 80 : }
1510 :
1511 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1512 68 : fn evict_blocking(
1513 68 : &self,
1514 68 : timeline: &Timeline,
1515 68 : _gate: &gate::GateGuard,
1516 68 : _permit: &heavier_once_cell::InitPermit,
1517 68 : ) -> Result<(), EvictionCancelled> {
1518 68 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1519 68 :
1520 68 : match capture_mtime_and_remove(&self.path) {
1521 68 : Ok(local_layer_mtime) => {
1522 68 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1523 68 : match duration {
1524 68 : Ok(elapsed) => {
1525 68 : let accessed_and_visible = self.access_stats.accessed()
1526 8 : && self.access_stats.visibility() == LayerVisibilityHint::Visible;
1527 68 : if accessed_and_visible {
1528 8 : // Only layers used for reads contribute to our "low residence" metric that is used
1529 8 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1530 8 : // to be rapidly evicted without contributing to this metric.
1531 8 : timeline
1532 8 : .metrics
1533 8 : .evictions_with_low_residence_duration
1534 8 : .read()
1535 8 : .unwrap()
1536 8 : .observe(elapsed);
1537 60 : }
1538 :
1539 68 : tracing::info!(
1540 0 : residence_millis = elapsed.as_millis(),
1541 0 : accessed_and_visible,
1542 0 : "evicted layer after known residence period"
1543 : );
1544 : }
1545 : Err(_) => {
1546 0 : tracing::info!("evicted layer after unknown residence period");
1547 : }
1548 : }
1549 68 : timeline.metrics.evictions.inc();
1550 68 : timeline
1551 68 : .metrics
1552 68 : .resident_physical_size_sub(self.desc.file_size);
1553 : }
1554 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1555 0 : tracing::error!(
1556 : layer_size = %self.desc.file_size,
1557 0 : "failed to evict layer from disk, it was already gone"
1558 : );
1559 0 : return Err(EvictionCancelled::FileNotFound);
1560 : }
1561 0 : Err(e) => {
1562 0 : // FIXME: this should probably be an abort
1563 0 : tracing::error!("failed to evict file from disk: {e:#}");
1564 0 : return Err(EvictionCancelled::RemoveFailed);
1565 : }
1566 : }
1567 :
1568 68 : self.access_stats.record_residence_event();
1569 68 :
1570 68 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1571 68 :
1572 68 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1573 68 :
1574 68 : Ok(())
1575 68 : }
1576 :
1577 6290 : fn metadata(&self) -> LayerFileMetadata {
1578 6290 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1579 6290 : }
1580 :
1581 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1582 : ///
1583 : /// Synchronizing with spawned tasks is very complicated otherwise.
1584 108 : fn spawn<F>(fut: F)
1585 108 : where
1586 108 : F: std::future::Future<Output = ()> + Send + 'static,
1587 108 : {
1588 108 : #[cfg(test)]
1589 108 : tokio::task::spawn(fut);
1590 108 : #[cfg(not(test))]
1591 108 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1592 108 : }
1593 :
1594 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1595 1090 : fn spawn_blocking<F>(f: F)
1596 1090 : where
1597 1090 : F: FnOnce() + Send + 'static,
1598 1090 : {
1599 1090 : #[cfg(test)]
1600 1090 : tokio::task::spawn_blocking(f);
1601 1090 : #[cfg(not(test))]
1602 1090 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1603 1090 : }
1604 : }
1605 :
1606 68 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1607 68 : let m = path.metadata()?;
1608 68 : let local_layer_mtime = m.modified()?;
1609 68 : std::fs::remove_file(path)?;
1610 68 : Ok(local_layer_mtime)
1611 68 : }
1612 :
1613 : #[derive(Debug, thiserror::Error)]
1614 : pub(crate) enum EvictionError {
1615 : #[error("layer was already evicted")]
1616 : NotFound,
1617 :
1618 : /// Evictions must always lose to downloads in races, and this time it happened.
1619 : #[error("layer was downloaded instead")]
1620 : Downloaded,
1621 :
1622 : #[error("eviction did not happen within timeout")]
1623 : Timeout,
1624 : }
1625 :
1626 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1627 : #[derive(Debug, thiserror::Error)]
1628 : pub(crate) enum DownloadError {
1629 : #[error("timeline has already shutdown")]
1630 : TimelineShutdown,
1631 : #[error("context denies downloading")]
1632 : ContextAndConfigReallyDeniesDownloads,
1633 : #[error("downloading is really required but not allowed by this method")]
1634 : DownloadRequired,
1635 : #[error("layer path exists, but it is not a file: {0:?}")]
1636 : NotFile(std::fs::FileType),
1637 : /// Why no error here? Because it will be reported by page_service. We should had also done
1638 : /// retries already.
1639 : #[error("downloading evicted layer file failed")]
1640 : DownloadFailed,
1641 : #[error("downloading failed, possibly for shutdown")]
1642 : DownloadCancelled,
1643 : #[error("pre-condition: stat before download failed")]
1644 : PreStatFailed(#[source] std::io::Error),
1645 :
1646 : #[cfg(test)]
1647 : #[error("failpoint: {0:?}")]
1648 : Failpoint(failpoints::FailpointKind),
1649 : }
1650 :
1651 : impl DownloadError {
1652 0 : pub(crate) fn is_cancelled(&self) -> bool {
1653 0 : matches!(self, DownloadError::DownloadCancelled)
1654 0 : }
1655 : }
1656 :
1657 : #[derive(Debug, PartialEq)]
1658 : pub(crate) enum NeedsDownload {
1659 : NotFound,
1660 : NotFile(std::fs::FileType),
1661 : WrongSize { actual: u64, expected: u64 },
1662 : }
1663 :
1664 : impl std::fmt::Display for NeedsDownload {
1665 28 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1666 28 : match self {
1667 28 : NeedsDownload::NotFound => write!(f, "file was not found"),
1668 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1669 0 : NeedsDownload::WrongSize { actual, expected } => {
1670 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1671 : }
1672 : }
1673 28 : }
1674 : }
1675 :
1676 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1677 : pub(crate) struct DownloadedLayer {
1678 : owner: Weak<LayerInner>,
1679 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1680 : // DownloadedLayer
1681 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1682 : version: usize,
1683 : }
1684 :
1685 : impl std::fmt::Debug for DownloadedLayer {
1686 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1687 0 : f.debug_struct("DownloadedLayer")
1688 0 : // owner omitted because it is always "Weak"
1689 0 : .field("kind", &self.kind)
1690 0 : .field("version", &self.version)
1691 0 : .finish()
1692 0 : }
1693 : }
1694 :
1695 : impl Drop for DownloadedLayer {
1696 1490 : fn drop(&mut self) {
1697 1490 : if let Some(owner) = self.owner.upgrade() {
1698 80 : owner.on_downloaded_layer_drop(self.version);
1699 1410 : } else {
1700 1410 : // Layer::drop will handle cancelling the eviction; because of drop order and
1701 1410 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1702 1410 : }
1703 1490 : }
1704 : }
1705 :
1706 : impl DownloadedLayer {
1707 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1708 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1709 : /// the initial load failure immediately.
1710 : ///
1711 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1712 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1713 : /// we will always have the LayerInner on the callstack, so we can just use it.
1714 481528 : async fn get<'a>(
1715 481528 : &'a self,
1716 481528 : owner: &Arc<LayerInner>,
1717 481528 : ctx: &RequestContext,
1718 481528 : ) -> anyhow::Result<&'a LayerKind> {
1719 481528 : let init = || async {
1720 2456 : assert_eq!(
1721 2456 : Weak::as_ptr(&self.owner),
1722 2456 : Arc::as_ptr(owner),
1723 0 : "these are the same, just avoiding the upgrade"
1724 : );
1725 :
1726 2456 : let res = if owner.desc.is_delta {
1727 2188 : let ctx = RequestContextBuilder::extend(ctx)
1728 2188 : .page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
1729 2188 : .build();
1730 2188 : let summary = Some(delta_layer::Summary::expected(
1731 2188 : owner.desc.tenant_shard_id.tenant_id,
1732 2188 : owner.desc.timeline_id,
1733 2188 : owner.desc.key_range.clone(),
1734 2188 : owner.desc.lsn_range.clone(),
1735 2188 : ));
1736 2188 : delta_layer::DeltaLayerInner::load(
1737 2188 : &owner.path,
1738 2188 : summary,
1739 2188 : Some(owner.conf.max_vectored_read_bytes),
1740 2188 : &ctx,
1741 2188 : )
1742 2188 : .await
1743 2188 : .map(LayerKind::Delta)
1744 : } else {
1745 268 : let ctx = RequestContextBuilder::extend(ctx)
1746 268 : .page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
1747 268 : .build();
1748 268 : let lsn = owner.desc.image_layer_lsn();
1749 268 : let summary = Some(image_layer::Summary::expected(
1750 268 : owner.desc.tenant_shard_id.tenant_id,
1751 268 : owner.desc.timeline_id,
1752 268 : owner.desc.key_range.clone(),
1753 268 : lsn,
1754 268 : ));
1755 268 : image_layer::ImageLayerInner::load(
1756 268 : &owner.path,
1757 268 : lsn,
1758 268 : summary,
1759 268 : Some(owner.conf.max_vectored_read_bytes),
1760 268 : &ctx,
1761 268 : )
1762 268 : .await
1763 268 : .map(LayerKind::Image)
1764 : };
1765 :
1766 2456 : match res {
1767 2456 : Ok(layer) => Ok(layer),
1768 0 : Err(err) => {
1769 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1770 0 : // We log this message once over the lifetime of `Self`
1771 0 : // => Ok and good to log backtrace and path here.
1772 0 : tracing::error!(
1773 0 : "layer load failed, assuming permanent failure: {}: {err:?}",
1774 0 : owner.path
1775 : );
1776 0 : Err(err)
1777 : }
1778 : }
1779 4912 : };
1780 481528 : self.kind
1781 481528 : .get_or_init(init)
1782 481528 : .await
1783 481528 : .as_ref()
1784 481528 : // We already logged the full backtrace above, once. Don't repeat that here.
1785 481528 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1786 481528 : }
1787 :
1788 479440 : async fn get_values_reconstruct_data(
1789 479440 : &self,
1790 479440 : this: ResidentLayer,
1791 479440 : keyspace: KeySpace,
1792 479440 : lsn_range: Range<Lsn>,
1793 479440 : reconstruct_data: &mut ValuesReconstructState,
1794 479440 : ctx: &RequestContext,
1795 479440 : ) -> Result<(), GetVectoredError> {
1796 : use LayerKind::*;
1797 :
1798 479440 : match self
1799 479440 : .get(&this.owner.0, ctx)
1800 479440 : .await
1801 479440 : .map_err(GetVectoredError::Other)?
1802 : {
1803 434324 : Delta(d) => {
1804 434324 : d.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
1805 434324 : .await
1806 : }
1807 45116 : Image(i) => {
1808 45116 : i.get_values_reconstruct_data(this, keyspace, reconstruct_data, ctx)
1809 45116 : .await
1810 : }
1811 : }
1812 479440 : }
1813 :
1814 8 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1815 : use LayerKind::*;
1816 8 : match self.get(owner, ctx).await? {
1817 8 : Delta(d) => d.dump(ctx).await?,
1818 0 : Image(i) => i.dump(ctx).await?,
1819 : }
1820 :
1821 8 : Ok(())
1822 8 : }
1823 : }
1824 :
1825 : /// Wrapper around an actual layer implementation.
1826 : #[derive(Debug)]
1827 : enum LayerKind {
1828 : Delta(delta_layer::DeltaLayerInner),
1829 : Image(image_layer::ImageLayerInner),
1830 : }
1831 :
1832 : /// Guard for forcing a layer be resident while it exists.
1833 : #[derive(Clone)]
1834 : pub struct ResidentLayer {
1835 : owner: Layer,
1836 : downloaded: Arc<DownloadedLayer>,
1837 : }
1838 :
1839 : impl std::fmt::Display for ResidentLayer {
1840 4272 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1841 4272 : write!(f, "{}", self.owner)
1842 4272 : }
1843 : }
1844 :
1845 : impl std::fmt::Debug for ResidentLayer {
1846 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1847 0 : write!(f, "{}", self.owner)
1848 0 : }
1849 : }
1850 :
1851 : impl ResidentLayer {
1852 : /// Release the eviction guard, converting back into a plain [`Layer`].
1853 : ///
1854 : /// You can access the [`Layer`] also by using `as_ref`.
1855 840 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1856 840 : self.into()
1857 840 : }
1858 :
1859 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1860 0 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1861 : pub(crate) async fn load_keys<'a>(
1862 : &'a self,
1863 : ctx: &RequestContext,
1864 : ) -> anyhow::Result<Vec<pageserver_api::key::Key>> {
1865 : use LayerKind::*;
1866 :
1867 : let owner = &self.owner.0;
1868 : let inner = self.downloaded.get(owner, ctx).await?;
1869 :
1870 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1871 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1872 : // while it's being held.
1873 : self.owner.record_access(ctx);
1874 :
1875 : let res = match inner {
1876 : Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await,
1877 : Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await,
1878 : };
1879 0 : res.with_context(|| format!("Layer index is corrupted for {self}"))
1880 : }
1881 :
1882 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1883 : /// the provided writer. Return the number of keys written.
1884 16 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1885 : pub(crate) async fn filter(
1886 : &self,
1887 : shard_identity: &ShardIdentity,
1888 : writer: &mut ImageLayerWriter,
1889 : ctx: &RequestContext,
1890 : ) -> Result<usize, CompactionError> {
1891 : use LayerKind::*;
1892 :
1893 : match self
1894 : .downloaded
1895 : .get(&self.owner.0, ctx)
1896 : .await
1897 : .map_err(CompactionError::Other)?
1898 : {
1899 : Delta(_) => {
1900 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1901 : "cannot filter() on a delta layer {self}"
1902 : ))));
1903 : }
1904 : Image(i) => i
1905 : .filter(shard_identity, writer, ctx)
1906 : .await
1907 : .map_err(CompactionError::Other),
1908 : }
1909 : }
1910 :
1911 : /// Returns the amount of keys and values written to the writer.
1912 20 : pub(crate) async fn copy_delta_prefix(
1913 20 : &self,
1914 20 : writer: &mut super::delta_layer::DeltaLayerWriter,
1915 20 : until: Lsn,
1916 20 : ctx: &RequestContext,
1917 20 : ) -> anyhow::Result<usize> {
1918 : use LayerKind::*;
1919 :
1920 20 : let owner = &self.owner.0;
1921 20 :
1922 20 : match self.downloaded.get(owner, ctx).await? {
1923 20 : Delta(ref d) => d
1924 20 : .copy_prefix(writer, until, ctx)
1925 20 : .await
1926 20 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1927 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1928 : }
1929 20 : }
1930 :
1931 3651 : pub(crate) fn local_path(&self) -> &Utf8Path {
1932 3651 : &self.owner.0.path
1933 3651 : }
1934 :
1935 4376 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1936 4376 : self.owner.metadata()
1937 4376 : }
1938 :
1939 : /// Cast the layer to a delta, return an error if it is an image layer.
1940 1908 : pub(crate) async fn get_as_delta(
1941 1908 : &self,
1942 1908 : ctx: &RequestContext,
1943 1908 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1944 : use LayerKind::*;
1945 1908 : match self.downloaded.get(&self.owner.0, ctx).await? {
1946 1908 : Delta(ref d) => Ok(d),
1947 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1948 : }
1949 1908 : }
1950 :
1951 : /// Cast the layer to an image, return an error if it is a delta layer.
1952 136 : pub(crate) async fn get_as_image(
1953 136 : &self,
1954 136 : ctx: &RequestContext,
1955 136 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1956 : use LayerKind::*;
1957 136 : match self.downloaded.get(&self.owner.0, ctx).await? {
1958 136 : Image(ref d) => Ok(d),
1959 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1960 : }
1961 136 : }
1962 : }
1963 :
1964 : impl AsLayerDesc for ResidentLayer {
1965 2382973 : fn layer_desc(&self) -> &PersistentLayerDesc {
1966 2382973 : self.owner.layer_desc()
1967 2382973 : }
1968 : }
1969 :
1970 : impl AsRef<Layer> for ResidentLayer {
1971 4068 : fn as_ref(&self) -> &Layer {
1972 4068 : &self.owner
1973 4068 : }
1974 : }
1975 :
1976 : /// Drop the eviction guard.
1977 : impl From<ResidentLayer> for Layer {
1978 840 : fn from(value: ResidentLayer) -> Self {
1979 840 : value.owner
1980 840 : }
1981 : }
1982 :
1983 : use metrics::IntCounter;
1984 :
1985 : pub(crate) struct LayerImplMetrics {
1986 : started_evictions: IntCounter,
1987 : completed_evictions: IntCounter,
1988 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1989 :
1990 : started_deletes: IntCounter,
1991 : completed_deletes: IntCounter,
1992 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1993 :
1994 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1995 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1996 : redownload_after: metrics::Histogram,
1997 : time_to_evict: metrics::Histogram,
1998 : }
1999 :
2000 : impl Default for LayerImplMetrics {
2001 108 : fn default() -> Self {
2002 : use enum_map::Enum;
2003 :
2004 : // reminder: these will be pageserver_layer_* with "_total" suffix
2005 :
2006 108 : let started_evictions = metrics::register_int_counter!(
2007 108 : "pageserver_layer_started_evictions",
2008 108 : "Evictions started in the Layer implementation"
2009 108 : )
2010 108 : .unwrap();
2011 108 : let completed_evictions = metrics::register_int_counter!(
2012 108 : "pageserver_layer_completed_evictions",
2013 108 : "Evictions completed in the Layer implementation"
2014 108 : )
2015 108 : .unwrap();
2016 108 :
2017 108 : let cancelled_evictions = metrics::register_int_counter_vec!(
2018 108 : "pageserver_layer_cancelled_evictions_count",
2019 108 : "Different reasons for evictions to have been cancelled or failed",
2020 108 : &["reason"]
2021 108 : )
2022 108 : .unwrap();
2023 108 :
2024 972 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2025 972 : let reason = EvictionCancelled::from_usize(i);
2026 972 : let s = reason.as_str();
2027 972 : cancelled_evictions.with_label_values(&[s])
2028 972 : }));
2029 108 :
2030 108 : let started_deletes = metrics::register_int_counter!(
2031 108 : "pageserver_layer_started_deletes",
2032 108 : "Deletions on drop pending in the Layer implementation"
2033 108 : )
2034 108 : .unwrap();
2035 108 : let completed_deletes = metrics::register_int_counter!(
2036 108 : "pageserver_layer_completed_deletes",
2037 108 : "Deletions on drop completed in the Layer implementation"
2038 108 : )
2039 108 : .unwrap();
2040 108 :
2041 108 : let failed_deletes = metrics::register_int_counter_vec!(
2042 108 : "pageserver_layer_failed_deletes_count",
2043 108 : "Different reasons for deletions on drop to have failed",
2044 108 : &["reason"]
2045 108 : )
2046 108 : .unwrap();
2047 108 :
2048 216 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2049 216 : let reason = DeleteFailed::from_usize(i);
2050 216 : let s = reason.as_str();
2051 216 : failed_deletes.with_label_values(&[s])
2052 216 : }));
2053 108 :
2054 108 : let rare_counters = metrics::register_int_counter_vec!(
2055 108 : "pageserver_layer_assumed_rare_count",
2056 108 : "Times unexpected or assumed rare event happened",
2057 108 : &["event"]
2058 108 : )
2059 108 : .unwrap();
2060 108 :
2061 756 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2062 756 : let event = RareEvent::from_usize(i);
2063 756 : let s = event.as_str();
2064 756 : rare_counters.with_label_values(&[s])
2065 756 : }));
2066 108 :
2067 108 : let inits_cancelled = metrics::register_int_counter!(
2068 108 : "pageserver_layer_inits_cancelled_count",
2069 108 : "Times Layer initialization was cancelled",
2070 108 : )
2071 108 : .unwrap();
2072 108 :
2073 108 : let redownload_after = {
2074 108 : let minute = 60.0;
2075 108 : let hour = 60.0 * minute;
2076 108 : metrics::register_histogram!(
2077 108 : "pageserver_layer_redownloaded_after",
2078 108 : "Time between evicting and re-downloading.",
2079 108 : vec![
2080 108 : 10.0,
2081 108 : 30.0,
2082 108 : minute,
2083 108 : 5.0 * minute,
2084 108 : 15.0 * minute,
2085 108 : 30.0 * minute,
2086 108 : hour,
2087 108 : 12.0 * hour,
2088 108 : ]
2089 108 : )
2090 108 : .unwrap()
2091 108 : };
2092 108 :
2093 108 : let time_to_evict = metrics::register_histogram!(
2094 108 : "pageserver_layer_eviction_held_permit_seconds",
2095 108 : "Time eviction held the permit.",
2096 108 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2097 108 : )
2098 108 : .unwrap();
2099 108 :
2100 108 : Self {
2101 108 : started_evictions,
2102 108 : completed_evictions,
2103 108 : cancelled_evictions,
2104 108 :
2105 108 : started_deletes,
2106 108 : completed_deletes,
2107 108 : failed_deletes,
2108 108 :
2109 108 : rare_counters,
2110 108 : inits_cancelled,
2111 108 : redownload_after,
2112 108 : time_to_evict,
2113 108 : }
2114 108 : }
2115 : }
2116 :
2117 : impl LayerImplMetrics {
2118 84 : fn inc_started_evictions(&self) {
2119 84 : self.started_evictions.inc();
2120 84 : }
2121 68 : fn inc_completed_evictions(&self) {
2122 68 : self.completed_evictions.inc();
2123 68 : }
2124 16 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2125 16 : self.cancelled_evictions[reason].inc()
2126 16 : }
2127 :
2128 1032 : fn inc_started_deletes(&self) {
2129 1032 : self.started_deletes.inc();
2130 1032 : }
2131 999 : fn inc_completed_deletes(&self) {
2132 999 : self.completed_deletes.inc();
2133 999 : }
2134 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2135 0 : self.failed_deletes[reason].inc();
2136 0 : }
2137 :
2138 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2139 : /// attempt regardless of failure to delete local file.
2140 0 : fn inc_delete_removes_failed(&self) {
2141 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2142 0 : }
2143 :
2144 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2145 : /// the single caller which can start the download, so use this counter to separte them.
2146 0 : fn inc_init_completed_without_requester(&self) {
2147 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2148 0 : }
2149 :
2150 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2151 0 : fn inc_download_failed_without_requester(&self) {
2152 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2153 0 : }
2154 :
2155 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2156 : ///
2157 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2158 : /// Option.
2159 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2160 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2161 0 : }
2162 :
2163 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2164 : /// running with remote storage.
2165 12 : fn inc_init_needed_no_download(&self) {
2166 12 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2167 12 : }
2168 :
2169 : /// Expected rare because all layer files should be readable and good
2170 0 : fn inc_permanent_loading_failures(&self) {
2171 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2172 0 : }
2173 :
2174 0 : fn inc_init_cancelled(&self) {
2175 0 : self.inits_cancelled.inc()
2176 0 : }
2177 :
2178 28 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2179 28 : self.redownload_after.observe(duration.as_secs_f64())
2180 28 : }
2181 :
2182 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2183 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2184 : /// from other evictions, so this could have noise as well.
2185 0 : fn inc_evicted_with_waiters(&self) {
2186 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2187 0 : }
2188 :
2189 : /// Recorded at least initially as the permit is now acquired in async context before
2190 : /// spawn_blocking action.
2191 68 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2192 68 : self.time_to_evict.observe(duration.as_secs_f64())
2193 68 : }
2194 : }
2195 :
2196 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2197 : enum EvictionCancelled {
2198 : LayerGone,
2199 : TimelineGone,
2200 : VersionCheckFailed,
2201 : FileNotFound,
2202 : RemoveFailed,
2203 : AlreadyReinitialized,
2204 : /// Not evicted because of a pending reinitialization
2205 : LostToDownload,
2206 : /// After eviction, there was a new layer access which cancelled the eviction.
2207 : UpgradedBackOnAccess,
2208 : UnexpectedEvictedState,
2209 : }
2210 :
2211 : impl EvictionCancelled {
2212 972 : fn as_str(&self) -> &'static str {
2213 972 : match self {
2214 108 : EvictionCancelled::LayerGone => "layer_gone",
2215 108 : EvictionCancelled::TimelineGone => "timeline_gone",
2216 108 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2217 108 : EvictionCancelled::FileNotFound => "file_not_found",
2218 108 : EvictionCancelled::RemoveFailed => "remove_failed",
2219 108 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2220 108 : EvictionCancelled::LostToDownload => "lost_to_download",
2221 108 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2222 108 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2223 : }
2224 972 : }
2225 : }
2226 :
2227 : #[derive(enum_map::Enum)]
2228 : enum DeleteFailed {
2229 : TimelineGone,
2230 : DeleteSchedulingFailed,
2231 : }
2232 :
2233 : impl DeleteFailed {
2234 216 : fn as_str(&self) -> &'static str {
2235 216 : match self {
2236 108 : DeleteFailed::TimelineGone => "timeline_gone",
2237 108 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2238 : }
2239 216 : }
2240 : }
2241 :
2242 : #[derive(enum_map::Enum)]
2243 : enum RareEvent {
2244 : RemoveOnDropFailed,
2245 : InitCompletedWithoutRequester,
2246 : DownloadFailedWithoutRequester,
2247 : UpgradedWantedEvicted,
2248 : InitWithoutDownload,
2249 : PermanentLoadingFailure,
2250 : EvictedWithWaiters,
2251 : }
2252 :
2253 : impl RareEvent {
2254 756 : fn as_str(&self) -> &'static str {
2255 : use RareEvent::*;
2256 :
2257 756 : match self {
2258 108 : RemoveOnDropFailed => "remove_on_drop_failed",
2259 108 : InitCompletedWithoutRequester => "init_completed_without",
2260 108 : DownloadFailedWithoutRequester => "download_failed_without",
2261 108 : UpgradedWantedEvicted => "raced_wanted_evicted",
2262 108 : InitWithoutDownload => "init_needed_no_download",
2263 108 : PermanentLoadingFailure => "permanent_loading_failure",
2264 108 : EvictedWithWaiters => "evicted_with_waiters",
2265 : }
2266 756 : }
2267 : }
2268 :
2269 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2270 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|