Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
17 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
18 : use crate::task_mgr::TaskKind;
19 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
20 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
21 :
22 : use super::delta_layer::{self};
23 : use super::image_layer::{self};
24 : use super::{
25 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
26 : LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
27 : };
28 :
29 : use utils::generation::Generation;
30 :
31 : #[cfg(test)]
32 : mod tests;
33 :
34 : #[cfg(test)]
35 : mod failpoints;
36 :
37 : pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
38 :
39 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
40 : /// range of LSNs.
41 : ///
42 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
43 : /// layers are used to ingest incoming WAL, and provide fast access to the
44 : /// recent page versions. On-disk layers are stored as files on disk, and are
45 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
46 : /// [`InMemoryLayer`].
47 : ///
48 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
49 : /// A delta layer contains all modifications within a range of LSNs and keys.
50 : /// An image layer is a snapshot of all the data in a key-range, at a single
51 : /// LSN.
52 : ///
53 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
54 : /// general goal, read accesses should always win eviction and eviction should not wait for
55 : /// download.
56 : ///
57 : /// ### State transitions
58 : ///
59 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
60 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
61 : /// right size) or deleted.
62 : ///
63 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
64 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
65 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
66 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
67 : /// cancelling the eviction.
68 : ///
69 : /// ```text
70 : /// +-----------------+ get_or_maybe_download +--------------------------------+
71 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
72 : /// | ENOENT | /->| |
73 : /// +-----------------+ | +--------------------------------+
74 : /// ^ | | ^
75 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
76 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
77 : /// | | | | - re-initialize without download
78 : /// | | evict_and_wait | |
79 : /// +-----------------+ v |
80 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
81 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
82 : /// +-----------------+ +--------------------------------------+
83 : /// ```
84 : ///
85 : /// ### Unsupported
86 : ///
87 : /// - Evicting by the operator deleting files from the filesystem
88 : ///
89 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
90 : #[derive(Clone)]
91 : pub(crate) struct Layer(Arc<LayerInner>);
92 :
93 : impl std::fmt::Display for Layer {
94 4268 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 4268 : write!(
96 4268 : f,
97 4268 : "{}{}",
98 4268 : self.layer_desc().short_id(),
99 4268 : self.0.generation.get_suffix()
100 4268 : )
101 4268 : }
102 : }
103 :
104 : impl std::fmt::Debug for Layer {
105 8 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 8 : write!(f, "{}", self)
107 8 : }
108 : }
109 :
110 : impl AsLayerDesc for Layer {
111 3363509 : fn layer_desc(&self) -> &PersistentLayerDesc {
112 3363509 : self.0.layer_desc()
113 3363509 : }
114 : }
115 :
116 : impl PartialEq for Layer {
117 7 : fn eq(&self, other: &Self) -> bool {
118 7 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
119 7 : }
120 : }
121 :
122 3800 : pub(crate) fn local_layer_path(
123 3800 : conf: &PageServerConf,
124 3800 : tenant_shard_id: &TenantShardId,
125 3800 : timeline_id: &TimelineId,
126 3800 : layer_file_name: &LayerName,
127 3800 : generation: &Generation,
128 3800 : ) -> Utf8PathBuf {
129 3800 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
130 3800 :
131 3800 : if generation.is_none() {
132 : // Without a generation, we may only use legacy path style
133 0 : timeline_path.join(layer_file_name.to_string())
134 : } else {
135 3800 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
136 : }
137 3800 : }
138 :
139 : impl Layer {
140 : /// Creates a layer value for a file we know to not be resident.
141 0 : pub(crate) fn for_evicted(
142 0 : conf: &'static PageServerConf,
143 0 : timeline: &Arc<Timeline>,
144 0 : file_name: LayerName,
145 0 : metadata: LayerFileMetadata,
146 0 : ) -> Self {
147 0 : let local_path = local_layer_path(
148 0 : conf,
149 0 : &timeline.tenant_shard_id,
150 0 : &timeline.timeline_id,
151 0 : &file_name,
152 0 : &metadata.generation,
153 0 : );
154 0 :
155 0 : let desc = PersistentLayerDesc::from_filename(
156 0 : timeline.tenant_shard_id,
157 0 : timeline.timeline_id,
158 0 : file_name,
159 0 : metadata.file_size,
160 0 : );
161 0 :
162 0 : let owner = Layer(Arc::new(LayerInner::new(
163 0 : conf,
164 0 : timeline,
165 0 : local_path,
166 0 : desc,
167 0 : None,
168 0 : metadata.generation,
169 0 : metadata.shard,
170 0 : )));
171 0 :
172 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
173 :
174 0 : owner
175 0 : }
176 :
177 : /// Creates a Layer value for a file we know to be resident in timeline directory.
178 244 : pub(crate) fn for_resident(
179 244 : conf: &'static PageServerConf,
180 244 : timeline: &Arc<Timeline>,
181 244 : local_path: Utf8PathBuf,
182 244 : file_name: LayerName,
183 244 : metadata: LayerFileMetadata,
184 244 : ) -> ResidentLayer {
185 244 : let desc = PersistentLayerDesc::from_filename(
186 244 : timeline.tenant_shard_id,
187 244 : timeline.timeline_id,
188 244 : file_name,
189 244 : metadata.file_size,
190 244 : );
191 244 :
192 244 : let mut resident = None;
193 244 :
194 244 : let owner = Layer(Arc::new_cyclic(|owner| {
195 244 : let inner = Arc::new(DownloadedLayer {
196 244 : owner: owner.clone(),
197 244 : kind: tokio::sync::OnceCell::default(),
198 244 : version: 0,
199 244 : });
200 244 : resident = Some(inner.clone());
201 244 :
202 244 : LayerInner::new(
203 244 : conf,
204 244 : timeline,
205 244 : local_path,
206 244 : desc,
207 244 : Some(inner),
208 244 : metadata.generation,
209 244 : metadata.shard,
210 244 : )
211 244 : }));
212 244 :
213 244 : let downloaded = resident.expect("just initialized");
214 244 :
215 244 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
216 :
217 244 : timeline
218 244 : .metrics
219 244 : .resident_physical_size_add(metadata.file_size);
220 244 :
221 244 : ResidentLayer { downloaded, owner }
222 244 : }
223 :
224 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
225 : /// temporary path.
226 3588 : pub(crate) fn finish_creating(
227 3588 : conf: &'static PageServerConf,
228 3588 : timeline: &Arc<Timeline>,
229 3588 : desc: PersistentLayerDesc,
230 3588 : temp_path: &Utf8Path,
231 3588 : ) -> anyhow::Result<ResidentLayer> {
232 3588 : let mut resident = None;
233 3588 :
234 3588 : let owner = Layer(Arc::new_cyclic(|owner| {
235 3588 : let inner = Arc::new(DownloadedLayer {
236 3588 : owner: owner.clone(),
237 3588 : kind: tokio::sync::OnceCell::default(),
238 3588 : version: 0,
239 3588 : });
240 3588 : resident = Some(inner.clone());
241 3588 :
242 3588 : let local_path = local_layer_path(
243 3588 : conf,
244 3588 : &timeline.tenant_shard_id,
245 3588 : &timeline.timeline_id,
246 3588 : &desc.layer_name(),
247 3588 : &timeline.generation,
248 3588 : );
249 3588 :
250 3588 : LayerInner::new(
251 3588 : conf,
252 3588 : timeline,
253 3588 : local_path,
254 3588 : desc,
255 3588 : Some(inner),
256 3588 : timeline.generation,
257 3588 : timeline.get_shard_index(),
258 3588 : )
259 3588 : }));
260 3588 :
261 3588 : let downloaded = resident.expect("just initialized");
262 3588 :
263 3588 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
264 3588 : // TODO: this leaves the temp file in place if the rename fails, risking us running
265 3588 : // out of space. Should we clean it up here or does the calling context deal with this?
266 3588 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
267 3588 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
268 :
269 3588 : Ok(ResidentLayer { downloaded, owner })
270 3588 : }
271 :
272 : /// Requests the layer to be evicted and waits for this to be done.
273 : ///
274 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
275 : ///
276 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
277 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
278 : ///
279 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
280 : /// will happen regardless the future returned by this method completing unless there is a
281 : /// read access before eviction gets to complete.
282 : ///
283 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
284 : /// of download-evict cycle on retry.
285 72 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
286 72 : self.0.evict_and_wait(timeout).await
287 64 : }
288 :
289 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
290 : /// then.
291 : ///
292 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
293 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
294 : /// the value this is called on gets dropped.
295 : ///
296 : /// This is ensured by both of those methods accepting references to Layer.
297 : ///
298 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
299 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
300 1040 : pub(crate) fn delete_on_drop(&self) {
301 1040 : self.0.delete_on_drop();
302 1040 : }
303 :
304 480190 : pub(crate) async fn get_values_reconstruct_data(
305 480190 : &self,
306 480190 : keyspace: KeySpace,
307 480190 : lsn_range: Range<Lsn>,
308 480190 : reconstruct_data: &mut ValuesReconstructState,
309 480190 : ctx: &RequestContext,
310 480190 : ) -> Result<(), GetVectoredError> {
311 480190 : let downloaded = self
312 480190 : .0
313 480190 : .get_or_maybe_download(true, Some(ctx))
314 480190 : .await
315 480190 : .map_err(|err| match err {
316 : DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
317 0 : GetVectoredError::Cancelled
318 : }
319 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
320 480190 : })?;
321 480190 : let this = ResidentLayer {
322 480190 : downloaded: downloaded.clone(),
323 480190 : owner: self.clone(),
324 480190 : };
325 480190 :
326 480190 : self.record_access(ctx);
327 480190 :
328 480190 : downloaded
329 480190 : .get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
330 480190 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
331 480190 : .await
332 480190 : .map_err(|err| match err {
333 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
334 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
335 0 : ),
336 0 : err => err,
337 480190 : })
338 480190 : }
339 :
340 : /// Download the layer if evicted.
341 : ///
342 : /// Will not error when the layer is already downloaded.
343 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
344 0 : self.0.get_or_maybe_download(true, None).await?;
345 0 : Ok(())
346 0 : }
347 :
348 608 : pub(crate) async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
349 608 : self.0.needs_download().await
350 608 : }
351 :
352 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
353 : /// while the guard exists.
354 : ///
355 : /// Returns None if the layer is currently evicted or becoming evicted.
356 : #[cfg(test)]
357 40 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
358 40 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
359 :
360 28 : Some(ResidentLayer {
361 28 : downloaded,
362 28 : owner: self.clone(),
363 28 : })
364 40 : }
365 :
366 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
367 : /// with `EvictionError::NotFound`.
368 : ///
369 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
370 : /// will be unless a read happens soon.
371 111 : pub(crate) fn is_likely_resident(&self) -> bool {
372 111 : self.0
373 111 : .inner
374 111 : .get()
375 111 : .map(|rowe| rowe.is_likely_resident())
376 111 : .unwrap_or(false)
377 111 : }
378 :
379 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
380 1132 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
381 1132 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
382 :
383 1132 : Ok(ResidentLayer {
384 1132 : downloaded,
385 1132 : owner: self.clone(),
386 1132 : })
387 1132 : }
388 :
389 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
390 0 : self.0.info(reset)
391 0 : }
392 :
393 16 : pub(crate) fn latest_activity(&self) -> SystemTime {
394 16 : self.0.access_stats.latest_activity()
395 16 : }
396 :
397 20 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
398 20 : self.0.access_stats.visibility()
399 20 : }
400 :
401 3592 : pub(crate) fn local_path(&self) -> &Utf8Path {
402 3592 : &self.0.path
403 3592 : }
404 :
405 4688 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
406 4688 : self.0.metadata()
407 4688 : }
408 :
409 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
410 0 : self.0
411 0 : .timeline
412 0 : .upgrade()
413 0 : .map(|timeline| timeline.timeline_id)
414 0 : }
415 :
416 : /// Traditional debug dumping facility
417 : #[allow(unused)]
418 8 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
419 8 : self.0.desc.dump();
420 8 :
421 8 : if verbose {
422 : // for now, unconditionally download everything, even if that might not be wanted.
423 8 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
424 8 : l.dump(&self.0, ctx).await?
425 0 : }
426 :
427 8 : Ok(())
428 8 : }
429 :
430 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
431 : /// deletion scheduling has completed).
432 : ///
433 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
434 : /// separatedly.
435 : #[cfg(any(feature = "testing", test))]
436 4 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
437 4 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
438 :
439 4 : async move {
440 : loop {
441 4 : if rx.changed().await.is_err() {
442 4 : break;
443 0 : }
444 : }
445 4 : }
446 4 : }
447 :
448 480190 : fn record_access(&self, ctx: &RequestContext) {
449 480190 : if self.0.access_stats.record_access(ctx) {
450 : // Visibility was modified to Visible: maybe log about this
451 0 : match ctx.task_kind() {
452 : TaskKind::CalculateSyntheticSize
453 : | TaskKind::OndemandLogicalSizeCalculation
454 : | TaskKind::GarbageCollector
455 0 : | TaskKind::MgmtRequest => {
456 0 : // This situation is expected in code paths do binary searches of the LSN space to resolve
457 0 : // an LSN to a timestamp, which happens during GC, during GC cutoff calculations in synthetic size,
458 0 : // and on-demand for certain HTTP API requests. On-demand logical size calculation is also included
459 0 : // because it is run as a sub-task of synthetic size.
460 0 : }
461 : _ => {
462 : // In all other contexts, it is unusual to do I/O involving layers which are not visible at
463 : // some branch tip, so we log the fact that we are accessing something that the visibility
464 : // calculation thought should not be visible.
465 : //
466 : // This case is legal in brief time windows: for example an in-flight getpage request can hold on to a layer object
467 : // which was covered by a concurrent compaction.
468 0 : tracing::info!(
469 : layer=%self,
470 0 : "became visible as a result of access",
471 : );
472 : }
473 : }
474 :
475 : // Update the timeline's visible bytes count
476 0 : if let Some(tl) = self.0.timeline.upgrade() {
477 0 : tl.metrics
478 0 : .visible_physical_size_gauge
479 0 : .add(self.0.desc.file_size)
480 0 : }
481 480190 : }
482 480190 : }
483 :
484 692 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
485 692 : let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
486 : use LayerVisibilityHint::*;
487 692 : match (old_visibility, visibility) {
488 : (Visible, Covered) => {
489 : // Subtract this layer's contribution to the visible size metric
490 72 : if let Some(tl) = self.0.timeline.upgrade() {
491 72 : debug_assert!(
492 72 : tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
493 : );
494 72 : tl.metrics
495 72 : .visible_physical_size_gauge
496 72 : .sub(self.0.desc.file_size)
497 0 : }
498 : }
499 : (Covered, Visible) => {
500 : // Add this layer's contribution to the visible size metric
501 0 : if let Some(tl) = self.0.timeline.upgrade() {
502 0 : tl.metrics
503 0 : .visible_physical_size_gauge
504 0 : .add(self.0.desc.file_size)
505 0 : }
506 : }
507 620 : (Covered, Covered) | (Visible, Visible) => {
508 620 : // no change
509 620 : }
510 : }
511 692 : }
512 : }
513 :
514 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
515 : ///
516 : /// However when we want something evicted, we cannot evict it right away as there might be current
517 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
518 : /// read with [`Layer::get_values_reconstruct_data`].
519 : ///
520 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
521 : #[derive(Debug)]
522 : enum ResidentOrWantedEvicted {
523 : Resident(Arc<DownloadedLayer>),
524 : WantedEvicted(Weak<DownloadedLayer>, usize),
525 : }
526 :
527 : impl ResidentOrWantedEvicted {
528 : /// Non-mutating access to the a DownloadedLayer, if possible.
529 : ///
530 : /// This is not used on the read path (anything that calls
531 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
532 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
533 : #[cfg(test)]
534 28 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
535 28 : match self {
536 28 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
537 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
538 : }
539 28 : }
540 :
541 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
542 : /// reference from `ResidentOrWantedEvicted::get`.
543 99 : fn is_likely_resident(&self) -> bool {
544 99 : match self {
545 87 : ResidentOrWantedEvicted::Resident(_) => true,
546 12 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
547 : }
548 99 : }
549 :
550 : /// Upgrades any weak to strong if possible.
551 : ///
552 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
553 : /// happened.
554 481338 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
555 481338 : match self {
556 481322 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
557 16 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
558 0 : Some(strong) => {
559 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
560 0 :
561 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
562 0 :
563 0 : Some((strong, true))
564 : }
565 16 : None => None,
566 : },
567 : }
568 481338 : }
569 :
570 : /// When eviction is first requested, drop down to holding a [`Weak`].
571 : ///
572 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
573 : /// drop the possibly last strong reference outside of the mutex of
574 : /// [`heavier_once_cell::OnceCell`].
575 60 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
576 60 : match self {
577 52 : ResidentOrWantedEvicted::Resident(strong) => {
578 52 : let weak = Arc::downgrade(strong);
579 52 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
580 52 : std::mem::swap(self, &mut temp);
581 52 : match temp {
582 52 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
583 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
584 : }
585 : }
586 8 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
587 : }
588 60 : }
589 : }
590 :
591 : struct LayerInner {
592 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
593 : /// [`Self::path`].
594 : conf: &'static PageServerConf,
595 :
596 : /// Full path to the file; unclear if this should exist anymore.
597 : path: Utf8PathBuf,
598 :
599 : desc: PersistentLayerDesc,
600 :
601 : /// Timeline access is needed for remote timeline client and metrics.
602 : ///
603 : /// There should not be an access to timeline for any reason without entering the
604 : /// [`Timeline::gate`] at the same time.
605 : timeline: Weak<Timeline>,
606 :
607 : access_stats: LayerAccessStats,
608 :
609 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
610 : ///
611 : /// Filesystem changes (download, evict) are only done while holding a permit which the
612 : /// `heavier_once_cell` provides.
613 : ///
614 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
615 : /// possibly read while not holding it.
616 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
617 :
618 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
619 : wanted_deleted: AtomicBool,
620 :
621 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
622 : ///
623 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
624 : /// `ResidentOrWantedEvicted::WantedEvicted`.
625 : version: AtomicUsize,
626 :
627 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
628 : /// starts, or completes.
629 : ///
630 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
631 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
632 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
633 : /// [`ResidentOrWantedEvicted::Resident`] on access.
634 : ///
635 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
636 : status: Option<tokio::sync::watch::Sender<Status>>,
637 :
638 : /// Counter for exponential backoff with the download.
639 : ///
640 : /// This is atomic only for the purposes of having additional data only accessed while holding
641 : /// the InitPermit.
642 : consecutive_failures: AtomicUsize,
643 :
644 : /// The generation of this Layer.
645 : ///
646 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
647 : /// for created layers from [`Timeline::generation`].
648 : generation: Generation,
649 :
650 : /// The shard of this Layer.
651 : ///
652 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
653 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
654 : ///
655 : /// For loaded layers, this may be some other value if the tenant has undergone
656 : /// a shard split since the layer was originally written.
657 : shard: ShardIndex,
658 :
659 : /// When the Layer was last evicted but has not been downloaded since.
660 : ///
661 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
662 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
663 :
664 : #[cfg(test)]
665 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
666 : }
667 :
668 : impl std::fmt::Display for LayerInner {
669 60 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 60 : write!(f, "{}", self.layer_desc().short_id())
671 60 : }
672 : }
673 :
674 : impl AsLayerDesc for LayerInner {
675 3368689 : fn layer_desc(&self) -> &PersistentLayerDesc {
676 3368689 : &self.desc
677 3368689 : }
678 : }
679 :
680 : #[derive(Debug, Clone, Copy)]
681 : enum Status {
682 : Resident,
683 : Evicted,
684 : Downloading,
685 : }
686 :
687 : impl Drop for LayerInner {
688 1416 : fn drop(&mut self) {
689 1416 : // if there was a pending eviction, mark it cancelled here to balance metrics
690 1416 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
691 4 : {
692 4 : // eviction has already been started
693 4 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
694 4 :
695 4 : // eviction request is intentionally not honored as no one is present to wait for it
696 4 : // and we could be delaying shutdown for nothing.
697 1412 : }
698 :
699 1416 : let timeline = self.timeline.upgrade();
700 :
701 1416 : if let Some(timeline) = timeline.as_ref() {
702 : // Only need to decrement metrics if the timeline still exists: otherwise
703 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
704 1384 : if self.desc.is_delta() {
705 1241 : timeline.metrics.layer_count_delta.dec();
706 1241 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
707 1241 : } else {
708 143 : timeline.metrics.layer_count_image.dec();
709 143 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
710 143 : }
711 :
712 1384 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
713 1384 : debug_assert!(
714 1384 : timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
715 : );
716 1384 : timeline
717 1384 : .metrics
718 1384 : .visible_physical_size_gauge
719 1384 : .sub(self.desc.file_size);
720 0 : }
721 32 : }
722 :
723 1416 : if !*self.wanted_deleted.get_mut() {
724 392 : return;
725 1024 : }
726 :
727 1024 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
728 :
729 1024 : let path = std::mem::take(&mut self.path);
730 1024 : let file_name = self.layer_desc().layer_name();
731 1024 : let file_size = self.layer_desc().file_size;
732 1024 : let meta = self.metadata();
733 1024 : let status = self.status.take();
734 1024 :
735 1024 : Self::spawn_blocking(move || {
736 1024 : let _g = span.entered();
737 1024 :
738 1024 : // carry this until we are finished for [`Layer::wait_drop`] support
739 1024 : let _status = status;
740 :
741 1024 : let Some(timeline) = timeline else {
742 : // no need to nag that timeline is gone: under normal situation on
743 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
744 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
745 0 : return;
746 : };
747 :
748 1024 : let Ok(_guard) = timeline.gate.enter() else {
749 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
750 0 : return;
751 : };
752 :
753 1024 : let removed = match std::fs::remove_file(path) {
754 1020 : Ok(()) => true,
755 4 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
756 4 : // until we no longer do detaches by removing all local files before removing the
757 4 : // tenant from the global map, we will always get these errors even if we knew what
758 4 : // is the latest state.
759 4 : //
760 4 : // we currently do not track the latest state, so we'll also end up here on evicted
761 4 : // layers.
762 4 : false
763 : }
764 0 : Err(e) => {
765 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
766 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
767 0 : false
768 : }
769 : };
770 :
771 1024 : if removed {
772 1020 : timeline.metrics.resident_physical_size_sub(file_size);
773 1020 : }
774 1024 : let res = timeline
775 1024 : .remote_client
776 1024 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
777 :
778 1024 : if let Err(e) = res {
779 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
780 : // demonstrating this deadlock (without spawn_blocking): stop will drop
781 : // queued items, which will have ResidentLayer's, and those drops would try
782 : // to re-entrantly lock the RemoteTimelineClient inner state.
783 4 : if !timeline.is_active() {
784 4 : tracing::info!("scheduling deletion on drop failed: {e:#}");
785 : } else {
786 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
787 : }
788 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
789 1020 : } else {
790 1020 : LAYER_IMPL_METRICS.inc_completed_deletes();
791 1020 : }
792 1024 : });
793 1416 : }
794 : }
795 :
796 : impl LayerInner {
797 : #[allow(clippy::too_many_arguments)]
798 3832 : fn new(
799 3832 : conf: &'static PageServerConf,
800 3832 : timeline: &Arc<Timeline>,
801 3832 : local_path: Utf8PathBuf,
802 3832 : desc: PersistentLayerDesc,
803 3832 : downloaded: Option<Arc<DownloadedLayer>>,
804 3832 : generation: Generation,
805 3832 : shard: ShardIndex,
806 3832 : ) -> Self {
807 3832 : let (inner, version, init_status) = if let Some(inner) = downloaded {
808 3832 : let version = inner.version;
809 3832 : let resident = ResidentOrWantedEvicted::Resident(inner);
810 3832 : (
811 3832 : heavier_once_cell::OnceCell::new(resident),
812 3832 : version,
813 3832 : Status::Resident,
814 3832 : )
815 : } else {
816 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
817 : };
818 :
819 : // This object acts as a RAII guard on these metrics: increment on construction
820 3832 : if desc.is_delta() {
821 3116 : timeline.metrics.layer_count_delta.inc();
822 3116 : timeline.metrics.layer_size_delta.add(desc.file_size);
823 3116 : } else {
824 716 : timeline.metrics.layer_count_image.inc();
825 716 : timeline.metrics.layer_size_image.add(desc.file_size);
826 716 : }
827 :
828 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
829 3832 : timeline
830 3832 : .metrics
831 3832 : .visible_physical_size_gauge
832 3832 : .add(desc.file_size);
833 3832 :
834 3832 : LayerInner {
835 3832 : conf,
836 3832 : path: local_path,
837 3832 : desc,
838 3832 : timeline: Arc::downgrade(timeline),
839 3832 : access_stats: Default::default(),
840 3832 : wanted_deleted: AtomicBool::new(false),
841 3832 : inner,
842 3832 : version: AtomicUsize::new(version),
843 3832 : status: Some(tokio::sync::watch::channel(init_status).0),
844 3832 : consecutive_failures: AtomicUsize::new(0),
845 3832 : generation,
846 3832 : shard,
847 3832 : last_evicted_at: std::sync::Mutex::default(),
848 3832 : #[cfg(test)]
849 3832 : failpoints: Default::default(),
850 3832 : }
851 3832 : }
852 :
853 1040 : fn delete_on_drop(&self) {
854 1040 : let res =
855 1040 : self.wanted_deleted
856 1040 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
857 1040 :
858 1040 : if res.is_ok() {
859 1032 : LAYER_IMPL_METRICS.inc_started_deletes();
860 1032 : }
861 1040 : }
862 :
863 : /// Cancellation safe, however dropping the future and calling this method again might result
864 : /// in a new attempt to evict OR join the previously started attempt.
865 72 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
866 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
867 : let mut rx = self.status.as_ref().unwrap().subscribe();
868 :
869 : {
870 : let current = rx.borrow_and_update();
871 : match &*current {
872 : Status::Resident => {
873 : // we might get lucky and evict this; continue
874 : }
875 : Status::Evicted | Status::Downloading => {
876 : // it is already evicted
877 : return Err(EvictionError::NotFound);
878 : }
879 : }
880 : }
881 :
882 : let strong = {
883 : match self.inner.get() {
884 : Some(mut either) => either.downgrade(),
885 : None => {
886 : // we already have a scheduled eviction, which just has not gotten to run yet.
887 : // it might still race with a read access, but that could also get cancelled,
888 : // so let's say this is not evictable.
889 : return Err(EvictionError::NotFound);
890 : }
891 : }
892 : };
893 :
894 : if strong.is_some() {
895 : // drop the DownloadedLayer outside of the holding the guard
896 : drop(strong);
897 :
898 : // idea here is that only one evicter should ever get to witness a strong reference,
899 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
900 : // cancelled eviction and signal us, like it currently does.
901 : //
902 : // a second concurrent evict_and_wait will not see a strong reference.
903 : LAYER_IMPL_METRICS.inc_started_evictions();
904 : }
905 :
906 : let changed = rx.changed();
907 : let changed = tokio::time::timeout(timeout, changed).await;
908 :
909 : let Ok(changed) = changed else {
910 : return Err(EvictionError::Timeout);
911 : };
912 :
913 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
914 :
915 : let current = rx.borrow_and_update();
916 :
917 : match &*current {
918 : // the easiest case
919 : Status::Evicted => Ok(()),
920 : // it surely was evicted in between, but then there was a new access now; we can't know
921 : // if it'll succeed so lets just call it evicted
922 : Status::Downloading => Ok(()),
923 : // either the download which was started after eviction completed already, or it was
924 : // never evicted
925 : Status::Resident => Err(EvictionError::Downloaded),
926 : }
927 : }
928 :
929 : /// Cancellation safe.
930 481354 : async fn get_or_maybe_download(
931 481354 : self: &Arc<Self>,
932 481354 : allow_download: bool,
933 481354 : ctx: Option<&RequestContext>,
934 481354 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
935 32 : let (weak, permit) = {
936 : // get_or_init_detached can:
937 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
938 : // - be slow (wait for semaphore permit or closing)
939 481354 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
940 :
941 481354 : let locked = self
942 481354 : .inner
943 481354 : .get_or_init_detached()
944 481354 : .await
945 481354 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
946 481354 :
947 481354 : scopeguard::ScopeGuard::into_inner(init_cancelled);
948 :
949 481322 : match locked {
950 : // this path could had been a RwLock::read
951 481322 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
952 0 : Ok(Ok((strong, _))) => {
953 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
954 0 : // previously a `evict_and_wait` was received. this is the only place when we
955 0 : // send out an update without holding the InitPermit.
956 0 : //
957 0 : // note that we also have dropped the Guard; this is fine, because we just made
958 0 : // a state change and are holding a strong reference to be returned.
959 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
960 0 : LAYER_IMPL_METRICS
961 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
962 0 :
963 0 : return Ok(strong);
964 : }
965 16 : Ok(Err(guard)) => {
966 16 : // path to here: we won the eviction, the file should still be on the disk.
967 16 : let (weak, permit) = guard.take_and_deinit();
968 16 : (Some(weak), permit)
969 : }
970 16 : Err(permit) => (None, permit),
971 : }
972 : };
973 :
974 32 : if let Some(weak) = weak {
975 : // only drop the weak after dropping the heavier_once_cell guard
976 16 : assert!(
977 16 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
978 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
979 : );
980 16 : }
981 :
982 32 : let timeline = self
983 32 : .timeline
984 32 : .upgrade()
985 32 : .ok_or(DownloadError::TimelineShutdown)?;
986 :
987 : // count cancellations, which currently remain largely unexpected
988 32 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
989 :
990 : // check if we really need to be downloaded: this can happen if a read access won the
991 : // semaphore before eviction.
992 : //
993 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
994 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
995 32 : let needs_download = self
996 32 : .needs_download()
997 32 : .await
998 32 : .map_err(DownloadError::PreStatFailed);
999 32 :
1000 32 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1001 :
1002 32 : let needs_download = needs_download?;
1003 :
1004 32 : let Some(reason) = needs_download else {
1005 : // the file is present locally because eviction has not had a chance to run yet
1006 :
1007 : #[cfg(test)]
1008 16 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
1009 16 : .await?;
1010 :
1011 12 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
1012 12 :
1013 12 : return Ok(self.initialize_after_layer_is_on_disk(permit));
1014 : };
1015 :
1016 : // we must download; getting cancelled before spawning the download is not an issue as
1017 : // any still running eviction would not find anything to evict.
1018 :
1019 16 : if let NeedsDownload::NotFile(ft) = reason {
1020 0 : return Err(DownloadError::NotFile(ft));
1021 16 : }
1022 :
1023 16 : if let Some(ctx) = ctx {
1024 4 : self.check_expected_download(ctx)?;
1025 12 : }
1026 :
1027 16 : if !allow_download {
1028 : // this is only used from tests, but it is hard to test without the boolean
1029 4 : return Err(DownloadError::DownloadRequired);
1030 12 : }
1031 12 :
1032 12 : let download_ctx = ctx
1033 12 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1034 12 : .unwrap_or(RequestContext::new(
1035 12 : TaskKind::LayerDownload,
1036 12 : DownloadBehavior::Download,
1037 12 : ));
1038 :
1039 12 : async move {
1040 12 : tracing::info!(%reason, "downloading on-demand");
1041 :
1042 12 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1043 12 : let res = self
1044 12 : .download_init_and_wait(timeline, permit, download_ctx)
1045 12 : .await?;
1046 12 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1047 12 : Ok(res)
1048 12 : }
1049 12 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1050 12 : .await
1051 481354 : }
1052 :
1053 : /// Nag or fail per RequestContext policy
1054 4 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1055 : use crate::context::DownloadBehavior::*;
1056 4 : let b = ctx.download_behavior();
1057 4 : match b {
1058 4 : Download => Ok(()),
1059 : Warn | Error => {
1060 0 : tracing::info!(
1061 0 : "unexpectedly on-demand downloading for task kind {:?}",
1062 0 : ctx.task_kind()
1063 : );
1064 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1065 :
1066 0 : let really_error =
1067 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1068 :
1069 0 : if really_error {
1070 : // this check is only probablistic, seems like flakyness footgun
1071 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1072 : } else {
1073 0 : Ok(())
1074 : }
1075 : }
1076 : }
1077 4 : }
1078 :
1079 : /// Actual download, at most one is executed at the time.
1080 12 : async fn download_init_and_wait(
1081 12 : self: &Arc<Self>,
1082 12 : timeline: Arc<Timeline>,
1083 12 : permit: heavier_once_cell::InitPermit,
1084 12 : ctx: RequestContext,
1085 12 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1086 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1087 12 :
1088 12 : let (tx, rx) = tokio::sync::oneshot::channel();
1089 12 :
1090 12 : let this: Arc<Self> = self.clone();
1091 :
1092 12 : let guard = timeline
1093 12 : .gate
1094 12 : .enter()
1095 12 : .map_err(|_| DownloadError::DownloadCancelled)?;
1096 :
1097 12 : Self::spawn(
1098 12 : async move {
1099 0 : let _guard = guard;
1100 0 :
1101 0 : // now that we have commited to downloading, send out an update to:
1102 0 : // - unhang any pending eviction
1103 0 : // - break out of evict_and_wait
1104 0 : this.status
1105 0 : .as_ref()
1106 0 : .unwrap()
1107 0 : .send_replace(Status::Downloading);
1108 12 :
1109 12 : #[cfg(test)]
1110 12 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1111 12 : .await
1112 12 : .unwrap();
1113 :
1114 12 : let res = this.download_and_init(timeline, permit, &ctx).await;
1115 :
1116 12 : if let Err(res) = tx.send(res) {
1117 0 : match res {
1118 0 : Ok(_res) => {
1119 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1120 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1121 : }
1122 0 : Err(e) => {
1123 0 : tracing::info!(
1124 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1125 : );
1126 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1127 : }
1128 : }
1129 12 : }
1130 12 : }
1131 12 : .in_current_span(),
1132 12 : );
1133 12 :
1134 12 : match rx.await {
1135 12 : Ok(Ok(res)) => Ok(res),
1136 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1137 0 : Err(DownloadError::DownloadCancelled)
1138 : }
1139 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1140 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1141 : }
1142 12 : }
1143 :
1144 12 : async fn download_and_init(
1145 12 : self: &Arc<LayerInner>,
1146 12 : timeline: Arc<Timeline>,
1147 12 : permit: heavier_once_cell::InitPermit,
1148 12 : ctx: &RequestContext,
1149 12 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1150 12 : let result = timeline
1151 12 : .remote_client
1152 12 : .download_layer_file(
1153 12 : &self.desc.layer_name(),
1154 12 : &self.metadata(),
1155 12 : &self.path,
1156 12 : &timeline.gate,
1157 12 : &timeline.cancel,
1158 12 : ctx,
1159 12 : )
1160 12 : .await;
1161 :
1162 12 : match result {
1163 12 : Ok(size) => {
1164 12 : assert_eq!(size, self.desc.file_size);
1165 :
1166 12 : match self.needs_download().await {
1167 0 : Ok(Some(reason)) => {
1168 0 : // this is really a bug in needs_download or remote timeline client
1169 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1170 : }
1171 12 : Ok(None) => {
1172 12 : // as expected
1173 12 : }
1174 0 : Err(e) => {
1175 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1176 : }
1177 : }
1178 :
1179 12 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1180 12 : timeline
1181 12 : .metrics
1182 12 : .resident_physical_size_add(self.desc.file_size);
1183 12 : self.consecutive_failures.store(0, Ordering::Relaxed);
1184 12 :
1185 12 : let since_last_eviction = self
1186 12 : .last_evicted_at
1187 12 : .lock()
1188 12 : .unwrap()
1189 12 : .take()
1190 12 : .map(|ts| ts.elapsed());
1191 12 : if let Some(since_last_eviction) = since_last_eviction {
1192 12 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1193 12 : }
1194 :
1195 12 : self.access_stats.record_residence_event();
1196 12 :
1197 12 : Ok(self.initialize_after_layer_is_on_disk(permit))
1198 : }
1199 0 : Err(e) => {
1200 0 : let consecutive_failures =
1201 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1202 0 :
1203 0 : if timeline.cancel.is_cancelled() {
1204 : // If we're shutting down, drop out before logging the error
1205 0 : return Err(e);
1206 0 : }
1207 0 :
1208 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1209 :
1210 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1211 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1212 0 : 1.5,
1213 0 : 60.0,
1214 0 : );
1215 0 :
1216 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1217 0 :
1218 0 : tokio::select! {
1219 0 : _ = tokio::time::sleep(backoff) => {},
1220 0 : _ = timeline.cancel.cancelled() => {},
1221 : };
1222 :
1223 0 : Err(e)
1224 : }
1225 : }
1226 12 : }
1227 :
1228 : /// Initializes the `Self::inner` to a "resident" state.
1229 : ///
1230 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1231 : /// before calling this method.
1232 : ///
1233 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1234 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1235 24 : fn initialize_after_layer_is_on_disk(
1236 24 : self: &Arc<LayerInner>,
1237 24 : permit: heavier_once_cell::InitPermit,
1238 24 : ) -> Arc<DownloadedLayer> {
1239 24 : debug_assert_current_span_has_tenant_and_timeline_id();
1240 24 :
1241 24 : // disable any scheduled but not yet running eviction deletions for this initialization
1242 24 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1243 24 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1244 24 :
1245 24 : let res = Arc::new(DownloadedLayer {
1246 24 : owner: Arc::downgrade(self),
1247 24 : kind: tokio::sync::OnceCell::default(),
1248 24 : version: next_version,
1249 24 : });
1250 24 :
1251 24 : let waiters = self.inner.initializer_count();
1252 24 : if waiters > 0 {
1253 0 : tracing::info!(waiters, "completing layer init for other tasks");
1254 24 : }
1255 :
1256 24 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1257 24 :
1258 24 : self.inner.set(value, permit);
1259 24 :
1260 24 : res
1261 24 : }
1262 :
1263 656 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1264 656 : match tokio::fs::metadata(&self.path).await {
1265 640 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1266 16 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1267 0 : Err(e) => Err(e),
1268 : }
1269 656 : }
1270 :
1271 244 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1272 244 : match self.path.metadata() {
1273 244 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1274 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1275 0 : Err(e) => Err(e),
1276 : }
1277 244 : }
1278 :
1279 884 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1280 884 : // in future, this should include sha2-256 validation of the file.
1281 884 : if !m.is_file() {
1282 0 : Err(NeedsDownload::NotFile(m.file_type()))
1283 884 : } else if m.len() != self.desc.file_size {
1284 0 : Err(NeedsDownload::WrongSize {
1285 0 : actual: m.len(),
1286 0 : expected: self.desc.file_size,
1287 0 : })
1288 : } else {
1289 884 : Ok(())
1290 : }
1291 884 : }
1292 :
1293 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1294 0 : let layer_name = self.desc.layer_name().to_string();
1295 0 :
1296 0 : let resident = self
1297 0 : .inner
1298 0 : .get()
1299 0 : .map(|rowe| rowe.is_likely_resident())
1300 0 : .unwrap_or(false);
1301 0 :
1302 0 : let access_stats = self.access_stats.as_api_model(reset);
1303 0 :
1304 0 : if self.desc.is_delta {
1305 0 : let lsn_range = &self.desc.lsn_range;
1306 0 :
1307 0 : HistoricLayerInfo::Delta {
1308 0 : layer_file_name: layer_name,
1309 0 : layer_file_size: self.desc.file_size,
1310 0 : lsn_start: lsn_range.start,
1311 0 : lsn_end: lsn_range.end,
1312 0 : remote: !resident,
1313 0 : access_stats,
1314 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(
1315 0 : &self.layer_desc().key_range,
1316 0 : self.layer_desc().is_delta,
1317 0 : ),
1318 0 : }
1319 : } else {
1320 0 : let lsn = self.desc.image_layer_lsn();
1321 0 :
1322 0 : HistoricLayerInfo::Image {
1323 0 : layer_file_name: layer_name,
1324 0 : layer_file_size: self.desc.file_size,
1325 0 : lsn_start: lsn,
1326 0 : remote: !resident,
1327 0 : access_stats,
1328 0 : }
1329 : }
1330 0 : }
1331 :
1332 : /// `DownloadedLayer` is being dropped, so it calls this method.
1333 48 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1334 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1335 : // though here it is very likely
1336 48 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1337 :
1338 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1339 : // drop while the `self.inner` is being locked, leading to a deadlock.
1340 :
1341 48 : let start_evicting = async move {
1342 48 : #[cfg(test)]
1343 48 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1344 48 : .await
1345 48 : .expect("failpoint should not have errored");
1346 48 :
1347 48 : tracing::debug!("eviction started");
1348 :
1349 48 : let res = self.wait_for_turn_and_evict(only_version).await;
1350 : // metrics: ignore the Ok branch, it is not done yet
1351 48 : if let Err(e) = res {
1352 12 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1353 12 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1354 36 : }
1355 48 : };
1356 :
1357 48 : Self::spawn(start_evicting.instrument(span));
1358 48 : }
1359 :
1360 48 : async fn wait_for_turn_and_evict(
1361 48 : self: Arc<LayerInner>,
1362 48 : only_version: usize,
1363 48 : ) -> Result<(), EvictionCancelled> {
1364 92 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1365 : use Status::*;
1366 92 : match status {
1367 88 : Resident => Ok(()),
1368 4 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1369 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1370 : }
1371 92 : }
1372 :
1373 48 : let timeline = self
1374 48 : .timeline
1375 48 : .upgrade()
1376 48 : .ok_or(EvictionCancelled::TimelineGone)?;
1377 :
1378 48 : let mut rx = self
1379 48 : .status
1380 48 : .as_ref()
1381 48 : .expect("LayerInner cannot be dropped, holding strong ref")
1382 48 : .subscribe();
1383 48 :
1384 48 : is_good_to_continue(&rx.borrow_and_update())?;
1385 :
1386 44 : let Ok(gate) = timeline.gate.enter() else {
1387 0 : return Err(EvictionCancelled::TimelineGone);
1388 : };
1389 :
1390 36 : let permit = {
1391 : // we cannot just `std::fs::remove_file` because there might already be an
1392 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1393 : // operations must be done while holding the heavier_once_cell::InitPermit
1394 44 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1395 :
1396 44 : let waited = loop {
1397 : // we must race to the Downloading starting, otherwise we would have to wait until the
1398 : // completion of the download. waiting for download could be long and hinder our
1399 : // efforts to alert on "hanging" evictions.
1400 44 : tokio::select! {
1401 44 : res = &mut wait => break res,
1402 44 : _ = rx.changed() => {
1403 0 : is_good_to_continue(&rx.borrow_and_update())?;
1404 : // two possibilities for Status::Resident:
1405 : // - the layer was found locally from disk by a read
1406 : // - we missed a bunch of updates and now the layer is
1407 : // again downloaded -- assume we'll fail later on with
1408 : // version check or AlreadyReinitialized
1409 : }
1410 : }
1411 : };
1412 :
1413 : // re-check now that we have the guard or permit; all updates should have happened
1414 : // while holding the permit.
1415 44 : is_good_to_continue(&rx.borrow_and_update())?;
1416 :
1417 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1418 : // lead to deallocating the reference counted value, and the value we
1419 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1420 44 : let (_weak, permit) = match waited {
1421 40 : Ok(guard) => {
1422 40 : match &*guard {
1423 36 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1424 36 : if *version == only_version =>
1425 32 : {
1426 32 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1427 32 : let (weak, permit) = guard.take_and_deinit();
1428 32 : (Some(weak), permit)
1429 : }
1430 4 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1431 4 : // if we were not doing the version check, we would need to try to
1432 4 : // upgrade the weak here to see if it really is dropped. version check
1433 4 : // is done instead assuming that it is cheaper.
1434 4 : tracing::debug!(
1435 : version,
1436 : only_version,
1437 0 : "version mismatch, not deinitializing"
1438 : );
1439 4 : return Err(EvictionCancelled::VersionCheckFailed);
1440 : }
1441 : ResidentOrWantedEvicted::Resident(_) => {
1442 4 : return Err(EvictionCancelled::AlreadyReinitialized);
1443 : }
1444 : }
1445 : }
1446 4 : Err(permit) => {
1447 4 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1448 4 : (None, permit)
1449 : }
1450 : };
1451 :
1452 36 : permit
1453 36 : };
1454 36 :
1455 36 : let span = tracing::Span::current();
1456 36 :
1457 36 : let spawned_at = std::time::Instant::now();
1458 36 :
1459 36 : // this is on purpose a detached spawn; we don't need to wait for it
1460 36 : //
1461 36 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1462 36 : // well from a spawn_blocking thread.
1463 36 : //
1464 36 : // important to note that now that we've acquired the permit we have made sure the evicted
1465 36 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1466 36 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1467 36 : // evicting.
1468 36 : //
1469 36 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1470 36 : // reads. We will need to add cancellation for that if necessary.
1471 36 : Self::spawn_blocking(move || {
1472 36 : let _span = span.entered();
1473 36 :
1474 36 : let res = self.evict_blocking(&timeline, &gate, &permit);
1475 36 :
1476 36 : let waiters = self.inner.initializer_count();
1477 36 :
1478 36 : if waiters > 0 {
1479 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1480 36 : }
1481 :
1482 36 : let completed_in = spawned_at.elapsed();
1483 36 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1484 36 :
1485 36 : match res {
1486 36 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1487 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1488 : }
1489 :
1490 36 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1491 36 : });
1492 36 :
1493 36 : Ok(())
1494 48 : }
1495 :
1496 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1497 36 : fn evict_blocking(
1498 36 : &self,
1499 36 : timeline: &Timeline,
1500 36 : _gate: &gate::GateGuard,
1501 36 : _permit: &heavier_once_cell::InitPermit,
1502 36 : ) -> Result<(), EvictionCancelled> {
1503 36 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1504 36 :
1505 36 : match capture_mtime_and_remove(&self.path) {
1506 36 : Ok(local_layer_mtime) => {
1507 36 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1508 36 : match duration {
1509 36 : Ok(elapsed) => {
1510 36 : let accessed_and_visible = self.access_stats.accessed()
1511 8 : && self.access_stats.visibility() == LayerVisibilityHint::Visible;
1512 36 : if accessed_and_visible {
1513 8 : // Only layers used for reads contribute to our "low residence" metric that is used
1514 8 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1515 8 : // to be rapidly evicted without contributing to this metric.
1516 8 : timeline
1517 8 : .metrics
1518 8 : .evictions_with_low_residence_duration
1519 8 : .read()
1520 8 : .unwrap()
1521 8 : .observe(elapsed);
1522 28 : }
1523 :
1524 36 : tracing::info!(
1525 0 : residence_millis = elapsed.as_millis(),
1526 0 : accessed_and_visible,
1527 0 : "evicted layer after known residence period"
1528 : );
1529 : }
1530 : Err(_) => {
1531 0 : tracing::info!("evicted layer after unknown residence period");
1532 : }
1533 : }
1534 36 : timeline.metrics.evictions.inc();
1535 36 : timeline
1536 36 : .metrics
1537 36 : .resident_physical_size_sub(self.desc.file_size);
1538 : }
1539 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1540 0 : tracing::error!(
1541 : layer_size = %self.desc.file_size,
1542 0 : "failed to evict layer from disk, it was already gone"
1543 : );
1544 0 : return Err(EvictionCancelled::FileNotFound);
1545 : }
1546 0 : Err(e) => {
1547 0 : // FIXME: this should probably be an abort
1548 0 : tracing::error!("failed to evict file from disk: {e:#}");
1549 0 : return Err(EvictionCancelled::RemoveFailed);
1550 : }
1551 : }
1552 :
1553 36 : self.access_stats.record_residence_event();
1554 36 :
1555 36 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1556 36 :
1557 36 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1558 36 :
1559 36 : Ok(())
1560 36 : }
1561 :
1562 5724 : fn metadata(&self) -> LayerFileMetadata {
1563 5724 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1564 5724 : }
1565 :
1566 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1567 : ///
1568 : /// Synchronizing with spawned tasks is very complicated otherwise.
1569 60 : fn spawn<F>(fut: F)
1570 60 : where
1571 60 : F: std::future::Future<Output = ()> + Send + 'static,
1572 60 : {
1573 60 : #[cfg(test)]
1574 60 : tokio::task::spawn(fut);
1575 60 : #[cfg(not(test))]
1576 60 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1577 60 : }
1578 :
1579 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1580 1060 : fn spawn_blocking<F>(f: F)
1581 1060 : where
1582 1060 : F: FnOnce() + Send + 'static,
1583 1060 : {
1584 1060 : #[cfg(test)]
1585 1060 : tokio::task::spawn_blocking(f);
1586 1060 : #[cfg(not(test))]
1587 1060 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1588 1060 : }
1589 : }
1590 :
1591 36 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1592 36 : let m = path.metadata()?;
1593 36 : let local_layer_mtime = m.modified()?;
1594 36 : std::fs::remove_file(path)?;
1595 36 : Ok(local_layer_mtime)
1596 36 : }
1597 :
1598 : #[derive(Debug, thiserror::Error)]
1599 : pub(crate) enum EvictionError {
1600 : #[error("layer was already evicted")]
1601 : NotFound,
1602 :
1603 : /// Evictions must always lose to downloads in races, and this time it happened.
1604 : #[error("layer was downloaded instead")]
1605 : Downloaded,
1606 :
1607 : #[error("eviction did not happen within timeout")]
1608 : Timeout,
1609 : }
1610 :
1611 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1612 : #[derive(Debug, thiserror::Error)]
1613 : pub(crate) enum DownloadError {
1614 : #[error("timeline has already shutdown")]
1615 : TimelineShutdown,
1616 : #[error("context denies downloading")]
1617 : ContextAndConfigReallyDeniesDownloads,
1618 : #[error("downloading is really required but not allowed by this method")]
1619 : DownloadRequired,
1620 : #[error("layer path exists, but it is not a file: {0:?}")]
1621 : NotFile(std::fs::FileType),
1622 : /// Why no error here? Because it will be reported by page_service. We should had also done
1623 : /// retries already.
1624 : #[error("downloading evicted layer file failed")]
1625 : DownloadFailed,
1626 : #[error("downloading failed, possibly for shutdown")]
1627 : DownloadCancelled,
1628 : #[error("pre-condition: stat before download failed")]
1629 : PreStatFailed(#[source] std::io::Error),
1630 :
1631 : #[cfg(test)]
1632 : #[error("failpoint: {0:?}")]
1633 : Failpoint(failpoints::FailpointKind),
1634 : }
1635 :
1636 : impl DownloadError {
1637 0 : pub(crate) fn is_cancelled(&self) -> bool {
1638 0 : matches!(self, DownloadError::DownloadCancelled)
1639 0 : }
1640 : }
1641 :
1642 : #[derive(Debug, PartialEq)]
1643 : pub(crate) enum NeedsDownload {
1644 : NotFound,
1645 : NotFile(std::fs::FileType),
1646 : WrongSize { actual: u64, expected: u64 },
1647 : }
1648 :
1649 : impl std::fmt::Display for NeedsDownload {
1650 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1651 12 : match self {
1652 12 : NeedsDownload::NotFound => write!(f, "file was not found"),
1653 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1654 0 : NeedsDownload::WrongSize { actual, expected } => {
1655 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1656 : }
1657 : }
1658 12 : }
1659 : }
1660 :
1661 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1662 : pub(crate) struct DownloadedLayer {
1663 : owner: Weak<LayerInner>,
1664 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1665 : // DownloadedLayer
1666 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1667 : version: usize,
1668 : }
1669 :
1670 : impl std::fmt::Debug for DownloadedLayer {
1671 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1672 0 : f.debug_struct("DownloadedLayer")
1673 0 : // owner omitted because it is always "Weak"
1674 0 : .field("kind", &self.kind)
1675 0 : .field("version", &self.version)
1676 0 : .finish()
1677 0 : }
1678 : }
1679 :
1680 : impl Drop for DownloadedLayer {
1681 1460 : fn drop(&mut self) {
1682 1460 : if let Some(owner) = self.owner.upgrade() {
1683 48 : owner.on_downloaded_layer_drop(self.version);
1684 1412 : } else {
1685 1412 : // Layer::drop will handle cancelling the eviction; because of drop order and
1686 1412 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1687 1412 : }
1688 1460 : }
1689 : }
1690 :
1691 : impl DownloadedLayer {
1692 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1693 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1694 : /// the initial load failure immediately.
1695 : ///
1696 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1697 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1698 : /// we will always have the LayerInner on the callstack, so we can just use it.
1699 482278 : async fn get<'a>(
1700 482278 : &'a self,
1701 482278 : owner: &Arc<LayerInner>,
1702 482278 : ctx: &RequestContext,
1703 482278 : ) -> anyhow::Result<&'a LayerKind> {
1704 482278 : let init = || async {
1705 2456 : assert_eq!(
1706 2456 : Weak::as_ptr(&self.owner),
1707 2456 : Arc::as_ptr(owner),
1708 0 : "these are the same, just avoiding the upgrade"
1709 : );
1710 :
1711 2456 : let res = if owner.desc.is_delta {
1712 2188 : let ctx = RequestContextBuilder::extend(ctx)
1713 2188 : .page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
1714 2188 : .build();
1715 2188 : let summary = Some(delta_layer::Summary::expected(
1716 2188 : owner.desc.tenant_shard_id.tenant_id,
1717 2188 : owner.desc.timeline_id,
1718 2188 : owner.desc.key_range.clone(),
1719 2188 : owner.desc.lsn_range.clone(),
1720 2188 : ));
1721 2188 : delta_layer::DeltaLayerInner::load(
1722 2188 : &owner.path,
1723 2188 : summary,
1724 2188 : Some(owner.conf.max_vectored_read_bytes),
1725 2188 : &ctx,
1726 2188 : )
1727 2188 : .await
1728 2188 : .map(LayerKind::Delta)
1729 : } else {
1730 268 : let ctx = RequestContextBuilder::extend(ctx)
1731 268 : .page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
1732 268 : .build();
1733 268 : let lsn = owner.desc.image_layer_lsn();
1734 268 : let summary = Some(image_layer::Summary::expected(
1735 268 : owner.desc.tenant_shard_id.tenant_id,
1736 268 : owner.desc.timeline_id,
1737 268 : owner.desc.key_range.clone(),
1738 268 : lsn,
1739 268 : ));
1740 268 : image_layer::ImageLayerInner::load(
1741 268 : &owner.path,
1742 268 : lsn,
1743 268 : summary,
1744 268 : Some(owner.conf.max_vectored_read_bytes),
1745 268 : &ctx,
1746 268 : )
1747 268 : .await
1748 268 : .map(LayerKind::Image)
1749 : };
1750 :
1751 2456 : match res {
1752 2456 : Ok(layer) => Ok(layer),
1753 0 : Err(err) => {
1754 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1755 0 : // We log this message once over the lifetime of `Self`
1756 0 : // => Ok and good to log backtrace and path here.
1757 0 : tracing::error!(
1758 0 : "layer load failed, assuming permanent failure: {}: {err:?}",
1759 0 : owner.path
1760 : );
1761 0 : Err(err)
1762 : }
1763 : }
1764 4912 : };
1765 482278 : self.kind
1766 482278 : .get_or_init(init)
1767 482278 : .await
1768 482278 : .as_ref()
1769 482278 : // We already logged the full backtrace above, once. Don't repeat that here.
1770 482278 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1771 482278 : }
1772 :
1773 480190 : async fn get_values_reconstruct_data(
1774 480190 : &self,
1775 480190 : this: ResidentLayer,
1776 480190 : keyspace: KeySpace,
1777 480190 : lsn_range: Range<Lsn>,
1778 480190 : reconstruct_data: &mut ValuesReconstructState,
1779 480190 : ctx: &RequestContext,
1780 480190 : ) -> Result<(), GetVectoredError> {
1781 : use LayerKind::*;
1782 :
1783 480190 : match self
1784 480190 : .get(&this.owner.0, ctx)
1785 480190 : .await
1786 480190 : .map_err(GetVectoredError::Other)?
1787 : {
1788 435074 : Delta(d) => {
1789 435074 : d.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
1790 435074 : .await
1791 : }
1792 45116 : Image(i) => {
1793 45116 : i.get_values_reconstruct_data(this, keyspace, reconstruct_data, ctx)
1794 45116 : .await
1795 : }
1796 : }
1797 480190 : }
1798 :
1799 8 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1800 : use LayerKind::*;
1801 8 : match self.get(owner, ctx).await? {
1802 8 : Delta(d) => d.dump(ctx).await?,
1803 0 : Image(i) => i.dump(ctx).await?,
1804 : }
1805 :
1806 8 : Ok(())
1807 8 : }
1808 : }
1809 :
1810 : /// Wrapper around an actual layer implementation.
1811 : #[derive(Debug)]
1812 : enum LayerKind {
1813 : Delta(delta_layer::DeltaLayerInner),
1814 : Image(image_layer::ImageLayerInner),
1815 : }
1816 :
1817 : /// Guard for forcing a layer be resident while it exists.
1818 : #[derive(Clone)]
1819 : pub struct ResidentLayer {
1820 : owner: Layer,
1821 : downloaded: Arc<DownloadedLayer>,
1822 : }
1823 :
1824 : impl std::fmt::Display for ResidentLayer {
1825 4260 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1826 4260 : write!(f, "{}", self.owner)
1827 4260 : }
1828 : }
1829 :
1830 : impl std::fmt::Debug for ResidentLayer {
1831 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1832 0 : write!(f, "{}", self.owner)
1833 0 : }
1834 : }
1835 :
1836 : impl ResidentLayer {
1837 : /// Release the eviction guard, converting back into a plain [`Layer`].
1838 : ///
1839 : /// You can access the [`Layer`] also by using `as_ref`.
1840 840 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1841 840 : self.into()
1842 840 : }
1843 :
1844 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1845 0 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1846 : pub(crate) async fn load_keys<'a>(
1847 : &'a self,
1848 : ctx: &RequestContext,
1849 : ) -> anyhow::Result<Vec<pageserver_api::key::Key>> {
1850 : use LayerKind::*;
1851 :
1852 : let owner = &self.owner.0;
1853 : let inner = self.downloaded.get(owner, ctx).await?;
1854 :
1855 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1856 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1857 : // while it's being held.
1858 : self.owner.record_access(ctx);
1859 :
1860 : let res = match inner {
1861 : Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await,
1862 : Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await,
1863 : };
1864 0 : res.with_context(|| format!("Layer index is corrupted for {self}"))
1865 : }
1866 :
1867 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1868 : /// the provided writer. Return the number of keys written.
1869 16 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1870 : pub(crate) async fn filter(
1871 : &self,
1872 : shard_identity: &ShardIdentity,
1873 : writer: &mut ImageLayerWriter,
1874 : ctx: &RequestContext,
1875 : ) -> Result<usize, CompactionError> {
1876 : use LayerKind::*;
1877 :
1878 : match self
1879 : .downloaded
1880 : .get(&self.owner.0, ctx)
1881 : .await
1882 : .map_err(CompactionError::Other)?
1883 : {
1884 : Delta(_) => {
1885 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1886 : "cannot filter() on a delta layer {self}"
1887 : ))));
1888 : }
1889 : Image(i) => i
1890 : .filter(shard_identity, writer, ctx)
1891 : .await
1892 : .map_err(CompactionError::Other),
1893 : }
1894 : }
1895 :
1896 : /// Returns the amount of keys and values written to the writer.
1897 20 : pub(crate) async fn copy_delta_prefix(
1898 20 : &self,
1899 20 : writer: &mut super::delta_layer::DeltaLayerWriter,
1900 20 : until: Lsn,
1901 20 : ctx: &RequestContext,
1902 20 : ) -> anyhow::Result<usize> {
1903 : use LayerKind::*;
1904 :
1905 20 : let owner = &self.owner.0;
1906 20 :
1907 20 : match self.downloaded.get(owner, ctx).await? {
1908 20 : Delta(ref d) => d
1909 20 : .copy_prefix(writer, until, ctx)
1910 20 : .await
1911 20 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1912 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1913 : }
1914 20 : }
1915 :
1916 4105 : pub(crate) fn local_path(&self) -> &Utf8Path {
1917 4105 : &self.owner.0.path
1918 4105 : }
1919 :
1920 3876 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1921 3876 : self.owner.metadata()
1922 3876 : }
1923 :
1924 : /// Cast the layer to a delta, return an error if it is an image layer.
1925 1908 : pub(crate) async fn get_as_delta(
1926 1908 : &self,
1927 1908 : ctx: &RequestContext,
1928 1908 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1929 : use LayerKind::*;
1930 1908 : match self.downloaded.get(&self.owner.0, ctx).await? {
1931 1908 : Delta(ref d) => Ok(d),
1932 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1933 : }
1934 1908 : }
1935 :
1936 : /// Cast the layer to an image, return an error if it is a delta layer.
1937 136 : pub(crate) async fn get_as_image(
1938 136 : &self,
1939 136 : ctx: &RequestContext,
1940 136 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1941 : use LayerKind::*;
1942 136 : match self.downloaded.get(&self.owner.0, ctx).await? {
1943 136 : Image(ref d) => Ok(d),
1944 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1945 : }
1946 136 : }
1947 : }
1948 :
1949 : impl AsLayerDesc for ResidentLayer {
1950 2382061 : fn layer_desc(&self) -> &PersistentLayerDesc {
1951 2382061 : self.owner.layer_desc()
1952 2382061 : }
1953 : }
1954 :
1955 : impl AsRef<Layer> for ResidentLayer {
1956 4056 : fn as_ref(&self) -> &Layer {
1957 4056 : &self.owner
1958 4056 : }
1959 : }
1960 :
1961 : /// Drop the eviction guard.
1962 : impl From<ResidentLayer> for Layer {
1963 840 : fn from(value: ResidentLayer) -> Self {
1964 840 : value.owner
1965 840 : }
1966 : }
1967 :
1968 : use metrics::IntCounter;
1969 :
1970 : pub(crate) struct LayerImplMetrics {
1971 : started_evictions: IntCounter,
1972 : completed_evictions: IntCounter,
1973 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1974 :
1975 : started_deletes: IntCounter,
1976 : completed_deletes: IntCounter,
1977 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1978 :
1979 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1980 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1981 : redownload_after: metrics::Histogram,
1982 : time_to_evict: metrics::Histogram,
1983 : }
1984 :
1985 : impl Default for LayerImplMetrics {
1986 100 : fn default() -> Self {
1987 : use enum_map::Enum;
1988 :
1989 : // reminder: these will be pageserver_layer_* with "_total" suffix
1990 :
1991 100 : let started_evictions = metrics::register_int_counter!(
1992 100 : "pageserver_layer_started_evictions",
1993 100 : "Evictions started in the Layer implementation"
1994 100 : )
1995 100 : .unwrap();
1996 100 : let completed_evictions = metrics::register_int_counter!(
1997 100 : "pageserver_layer_completed_evictions",
1998 100 : "Evictions completed in the Layer implementation"
1999 100 : )
2000 100 : .unwrap();
2001 100 :
2002 100 : let cancelled_evictions = metrics::register_int_counter_vec!(
2003 100 : "pageserver_layer_cancelled_evictions_count",
2004 100 : "Different reasons for evictions to have been cancelled or failed",
2005 100 : &["reason"]
2006 100 : )
2007 100 : .unwrap();
2008 100 :
2009 900 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2010 900 : let reason = EvictionCancelled::from_usize(i);
2011 900 : let s = reason.as_str();
2012 900 : cancelled_evictions.with_label_values(&[s])
2013 900 : }));
2014 100 :
2015 100 : let started_deletes = metrics::register_int_counter!(
2016 100 : "pageserver_layer_started_deletes",
2017 100 : "Deletions on drop pending in the Layer implementation"
2018 100 : )
2019 100 : .unwrap();
2020 100 : let completed_deletes = metrics::register_int_counter!(
2021 100 : "pageserver_layer_completed_deletes",
2022 100 : "Deletions on drop completed in the Layer implementation"
2023 100 : )
2024 100 : .unwrap();
2025 100 :
2026 100 : let failed_deletes = metrics::register_int_counter_vec!(
2027 100 : "pageserver_layer_failed_deletes_count",
2028 100 : "Different reasons for deletions on drop to have failed",
2029 100 : &["reason"]
2030 100 : )
2031 100 : .unwrap();
2032 100 :
2033 200 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2034 200 : let reason = DeleteFailed::from_usize(i);
2035 200 : let s = reason.as_str();
2036 200 : failed_deletes.with_label_values(&[s])
2037 200 : }));
2038 100 :
2039 100 : let rare_counters = metrics::register_int_counter_vec!(
2040 100 : "pageserver_layer_assumed_rare_count",
2041 100 : "Times unexpected or assumed rare event happened",
2042 100 : &["event"]
2043 100 : )
2044 100 : .unwrap();
2045 100 :
2046 700 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2047 700 : let event = RareEvent::from_usize(i);
2048 700 : let s = event.as_str();
2049 700 : rare_counters.with_label_values(&[s])
2050 700 : }));
2051 100 :
2052 100 : let inits_cancelled = metrics::register_int_counter!(
2053 100 : "pageserver_layer_inits_cancelled_count",
2054 100 : "Times Layer initialization was cancelled",
2055 100 : )
2056 100 : .unwrap();
2057 100 :
2058 100 : let redownload_after = {
2059 100 : let minute = 60.0;
2060 100 : let hour = 60.0 * minute;
2061 100 : metrics::register_histogram!(
2062 100 : "pageserver_layer_redownloaded_after",
2063 100 : "Time between evicting and re-downloading.",
2064 100 : vec![
2065 100 : 10.0,
2066 100 : 30.0,
2067 100 : minute,
2068 100 : 5.0 * minute,
2069 100 : 15.0 * minute,
2070 100 : 30.0 * minute,
2071 100 : hour,
2072 100 : 12.0 * hour,
2073 100 : ]
2074 100 : )
2075 100 : .unwrap()
2076 100 : };
2077 100 :
2078 100 : let time_to_evict = metrics::register_histogram!(
2079 100 : "pageserver_layer_eviction_held_permit_seconds",
2080 100 : "Time eviction held the permit.",
2081 100 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2082 100 : )
2083 100 : .unwrap();
2084 100 :
2085 100 : Self {
2086 100 : started_evictions,
2087 100 : completed_evictions,
2088 100 : cancelled_evictions,
2089 100 :
2090 100 : started_deletes,
2091 100 : completed_deletes,
2092 100 : failed_deletes,
2093 100 :
2094 100 : rare_counters,
2095 100 : inits_cancelled,
2096 100 : redownload_after,
2097 100 : time_to_evict,
2098 100 : }
2099 100 : }
2100 : }
2101 :
2102 : impl LayerImplMetrics {
2103 52 : fn inc_started_evictions(&self) {
2104 52 : self.started_evictions.inc();
2105 52 : }
2106 36 : fn inc_completed_evictions(&self) {
2107 36 : self.completed_evictions.inc();
2108 36 : }
2109 16 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2110 16 : self.cancelled_evictions[reason].inc()
2111 16 : }
2112 :
2113 1032 : fn inc_started_deletes(&self) {
2114 1032 : self.started_deletes.inc();
2115 1032 : }
2116 1020 : fn inc_completed_deletes(&self) {
2117 1020 : self.completed_deletes.inc();
2118 1020 : }
2119 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2120 0 : self.failed_deletes[reason].inc();
2121 0 : }
2122 :
2123 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2124 : /// attempt regardless of failure to delete local file.
2125 0 : fn inc_delete_removes_failed(&self) {
2126 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2127 0 : }
2128 :
2129 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2130 : /// the single caller which can start the download, so use this counter to separte them.
2131 0 : fn inc_init_completed_without_requester(&self) {
2132 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2133 0 : }
2134 :
2135 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2136 0 : fn inc_download_failed_without_requester(&self) {
2137 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2138 0 : }
2139 :
2140 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2141 : ///
2142 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2143 : /// Option.
2144 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2145 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2146 0 : }
2147 :
2148 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2149 : /// running with remote storage.
2150 12 : fn inc_init_needed_no_download(&self) {
2151 12 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2152 12 : }
2153 :
2154 : /// Expected rare because all layer files should be readable and good
2155 0 : fn inc_permanent_loading_failures(&self) {
2156 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2157 0 : }
2158 :
2159 0 : fn inc_init_cancelled(&self) {
2160 0 : self.inits_cancelled.inc()
2161 0 : }
2162 :
2163 12 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2164 12 : self.redownload_after.observe(duration.as_secs_f64())
2165 12 : }
2166 :
2167 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2168 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2169 : /// from other evictions, so this could have noise as well.
2170 0 : fn inc_evicted_with_waiters(&self) {
2171 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2172 0 : }
2173 :
2174 : /// Recorded at least initially as the permit is now acquired in async context before
2175 : /// spawn_blocking action.
2176 36 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2177 36 : self.time_to_evict.observe(duration.as_secs_f64())
2178 36 : }
2179 : }
2180 :
2181 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2182 : enum EvictionCancelled {
2183 : LayerGone,
2184 : TimelineGone,
2185 : VersionCheckFailed,
2186 : FileNotFound,
2187 : RemoveFailed,
2188 : AlreadyReinitialized,
2189 : /// Not evicted because of a pending reinitialization
2190 : LostToDownload,
2191 : /// After eviction, there was a new layer access which cancelled the eviction.
2192 : UpgradedBackOnAccess,
2193 : UnexpectedEvictedState,
2194 : }
2195 :
2196 : impl EvictionCancelled {
2197 900 : fn as_str(&self) -> &'static str {
2198 900 : match self {
2199 100 : EvictionCancelled::LayerGone => "layer_gone",
2200 100 : EvictionCancelled::TimelineGone => "timeline_gone",
2201 100 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2202 100 : EvictionCancelled::FileNotFound => "file_not_found",
2203 100 : EvictionCancelled::RemoveFailed => "remove_failed",
2204 100 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2205 100 : EvictionCancelled::LostToDownload => "lost_to_download",
2206 100 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2207 100 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2208 : }
2209 900 : }
2210 : }
2211 :
2212 : #[derive(enum_map::Enum)]
2213 : enum DeleteFailed {
2214 : TimelineGone,
2215 : DeleteSchedulingFailed,
2216 : }
2217 :
2218 : impl DeleteFailed {
2219 200 : fn as_str(&self) -> &'static str {
2220 200 : match self {
2221 100 : DeleteFailed::TimelineGone => "timeline_gone",
2222 100 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2223 : }
2224 200 : }
2225 : }
2226 :
2227 : #[derive(enum_map::Enum)]
2228 : enum RareEvent {
2229 : RemoveOnDropFailed,
2230 : InitCompletedWithoutRequester,
2231 : DownloadFailedWithoutRequester,
2232 : UpgradedWantedEvicted,
2233 : InitWithoutDownload,
2234 : PermanentLoadingFailure,
2235 : EvictedWithWaiters,
2236 : }
2237 :
2238 : impl RareEvent {
2239 700 : fn as_str(&self) -> &'static str {
2240 : use RareEvent::*;
2241 :
2242 700 : match self {
2243 100 : RemoveOnDropFailed => "remove_on_drop_failed",
2244 100 : InitCompletedWithoutRequester => "init_completed_without",
2245 100 : DownloadFailedWithoutRequester => "download_failed_without",
2246 100 : UpgradedWantedEvicted => "raced_wanted_evicted",
2247 100 : InitWithoutDownload => "init_needed_no_download",
2248 100 : PermanentLoadingFailure => "permanent_loading_failure",
2249 100 : EvictedWithWaiters => "evicted_with_waiters",
2250 : }
2251 700 : }
2252 : }
2253 :
2254 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2255 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|