Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
17 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
18 : use crate::task_mgr::TaskKind;
19 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
20 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
21 :
22 : use super::delta_layer::{self, DeltaEntry};
23 : use super::image_layer::{self};
24 : use super::{
25 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
26 : LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
27 : };
28 :
29 : use utils::generation::Generation;
30 :
31 : #[cfg(test)]
32 : mod tests;
33 :
34 : #[cfg(test)]
35 : mod failpoints;
36 :
37 : pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
38 :
39 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
40 : /// range of LSNs.
41 : ///
42 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
43 : /// layers are used to ingest incoming WAL, and provide fast access to the
44 : /// recent page versions. On-disk layers are stored as files on disk, and are
45 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
46 : /// [`InMemoryLayer`].
47 : ///
48 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
49 : /// A delta layer contains all modifications within a range of LSNs and keys.
50 : /// An image layer is a snapshot of all the data in a key-range, at a single
51 : /// LSN.
52 : ///
53 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
54 : /// general goal, read accesses should always win eviction and eviction should not wait for
55 : /// download.
56 : ///
57 : /// ### State transitions
58 : ///
59 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
60 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
61 : /// right size) or deleted.
62 : ///
63 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
64 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
65 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
66 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
67 : /// cancelling the eviction.
68 : ///
69 : /// ```text
70 : /// +-----------------+ get_or_maybe_download +--------------------------------+
71 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
72 : /// | ENOENT | /->| |
73 : /// +-----------------+ | +--------------------------------+
74 : /// ^ | | ^
75 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
76 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
77 : /// | | | | - re-initialize without download
78 : /// | | evict_and_wait | |
79 : /// +-----------------+ v |
80 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
81 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
82 : /// +-----------------+ +--------------------------------------+
83 : /// ```
84 : ///
85 : /// ### Unsupported
86 : ///
87 : /// - Evicting by the operator deleting files from the filesystem
88 : ///
89 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
90 : #[derive(Clone)]
91 : pub(crate) struct Layer(Arc<LayerInner>);
92 :
93 : impl std::fmt::Display for Layer {
94 5760 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 5760 : write!(
96 5760 : f,
97 5760 : "{}{}",
98 5760 : self.layer_desc().short_id(),
99 5760 : self.0.generation.get_suffix()
100 5760 : )
101 5760 : }
102 : }
103 :
104 : impl std::fmt::Debug for Layer {
105 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 0 : write!(f, "{}", self)
107 0 : }
108 : }
109 :
110 : impl AsLayerDesc for Layer {
111 1506590 : fn layer_desc(&self) -> &PersistentLayerDesc {
112 1506590 : self.0.layer_desc()
113 1506590 : }
114 : }
115 :
116 : impl PartialEq for Layer {
117 10 : fn eq(&self, other: &Self) -> bool {
118 10 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
119 10 : }
120 : }
121 :
122 5148 : pub(crate) fn local_layer_path(
123 5148 : conf: &PageServerConf,
124 5148 : tenant_shard_id: &TenantShardId,
125 5148 : timeline_id: &TimelineId,
126 5148 : layer_file_name: &LayerName,
127 5148 : generation: &Generation,
128 5148 : ) -> Utf8PathBuf {
129 5148 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
130 5148 :
131 5148 : if generation.is_none() {
132 : // Without a generation, we may only use legacy path style
133 0 : timeline_path.join(layer_file_name.to_string())
134 : } else {
135 5148 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
136 : }
137 5148 : }
138 :
139 : impl Layer {
140 : /// Creates a layer value for a file we know to not be resident.
141 0 : pub(crate) fn for_evicted(
142 0 : conf: &'static PageServerConf,
143 0 : timeline: &Arc<Timeline>,
144 0 : file_name: LayerName,
145 0 : metadata: LayerFileMetadata,
146 0 : ) -> Self {
147 0 : let local_path = local_layer_path(
148 0 : conf,
149 0 : &timeline.tenant_shard_id,
150 0 : &timeline.timeline_id,
151 0 : &file_name,
152 0 : &metadata.generation,
153 0 : );
154 0 :
155 0 : let desc = PersistentLayerDesc::from_filename(
156 0 : timeline.tenant_shard_id,
157 0 : timeline.timeline_id,
158 0 : file_name,
159 0 : metadata.file_size,
160 0 : );
161 0 :
162 0 : let owner = Layer(Arc::new(LayerInner::new(
163 0 : conf,
164 0 : timeline,
165 0 : local_path,
166 0 : desc,
167 0 : None,
168 0 : metadata.generation,
169 0 : metadata.shard,
170 0 : )));
171 0 :
172 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
173 :
174 0 : owner
175 0 : }
176 :
177 : /// Creates a Layer value for a file we know to be resident in timeline directory.
178 72 : pub(crate) fn for_resident(
179 72 : conf: &'static PageServerConf,
180 72 : timeline: &Arc<Timeline>,
181 72 : local_path: Utf8PathBuf,
182 72 : file_name: LayerName,
183 72 : metadata: LayerFileMetadata,
184 72 : ) -> ResidentLayer {
185 72 : let desc = PersistentLayerDesc::from_filename(
186 72 : timeline.tenant_shard_id,
187 72 : timeline.timeline_id,
188 72 : file_name,
189 72 : metadata.file_size,
190 72 : );
191 72 :
192 72 : let mut resident = None;
193 72 :
194 72 : let owner = Layer(Arc::new_cyclic(|owner| {
195 72 : let inner = Arc::new(DownloadedLayer {
196 72 : owner: owner.clone(),
197 72 : kind: tokio::sync::OnceCell::default(),
198 72 : version: 0,
199 72 : });
200 72 : resident = Some(inner.clone());
201 72 :
202 72 : LayerInner::new(
203 72 : conf,
204 72 : timeline,
205 72 : local_path,
206 72 : desc,
207 72 : Some(inner),
208 72 : metadata.generation,
209 72 : metadata.shard,
210 72 : )
211 72 : }));
212 72 :
213 72 : let downloaded = resident.expect("just initialized");
214 72 :
215 72 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
216 :
217 72 : timeline
218 72 : .metrics
219 72 : .resident_physical_size_add(metadata.file_size);
220 72 :
221 72 : ResidentLayer { downloaded, owner }
222 72 : }
223 :
224 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
225 : /// temporary path.
226 5124 : pub(crate) fn finish_creating(
227 5124 : conf: &'static PageServerConf,
228 5124 : timeline: &Arc<Timeline>,
229 5124 : desc: PersistentLayerDesc,
230 5124 : temp_path: &Utf8Path,
231 5124 : ) -> anyhow::Result<ResidentLayer> {
232 5124 : let mut resident = None;
233 5124 :
234 5124 : let owner = Layer(Arc::new_cyclic(|owner| {
235 5124 : let inner = Arc::new(DownloadedLayer {
236 5124 : owner: owner.clone(),
237 5124 : kind: tokio::sync::OnceCell::default(),
238 5124 : version: 0,
239 5124 : });
240 5124 : resident = Some(inner.clone());
241 5124 :
242 5124 : let local_path = local_layer_path(
243 5124 : conf,
244 5124 : &timeline.tenant_shard_id,
245 5124 : &timeline.timeline_id,
246 5124 : &desc.layer_name(),
247 5124 : &timeline.generation,
248 5124 : );
249 5124 :
250 5124 : LayerInner::new(
251 5124 : conf,
252 5124 : timeline,
253 5124 : local_path,
254 5124 : desc,
255 5124 : Some(inner),
256 5124 : timeline.generation,
257 5124 : timeline.get_shard_index(),
258 5124 : )
259 5124 : }));
260 5124 :
261 5124 : let downloaded = resident.expect("just initialized");
262 5124 :
263 5124 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
264 5124 : // TODO: this leaves the temp file in place if the rename fails, risking us running
265 5124 : // out of space. Should we clean it up here or does the calling context deal with this?
266 5124 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
267 5124 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
268 :
269 5124 : Ok(ResidentLayer { downloaded, owner })
270 5124 : }
271 :
272 : /// Requests the layer to be evicted and waits for this to be done.
273 : ///
274 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
275 : ///
276 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
277 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
278 : ///
279 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
280 : /// will happen regardless the future returned by this method completing unless there is a
281 : /// read access before eviction gets to complete.
282 : ///
283 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
284 : /// of download-evict cycle on retry.
285 108 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
286 146 : self.0.evict_and_wait(timeout).await
287 96 : }
288 :
289 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
290 : /// then.
291 : ///
292 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
293 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
294 : /// the value this is called on gets dropped.
295 : ///
296 : /// This is ensured by both of those methods accepting references to Layer.
297 : ///
298 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
299 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
300 1410 : pub(crate) fn delete_on_drop(&self) {
301 1410 : self.0.delete_on_drop();
302 1410 : }
303 :
304 635707 : pub(crate) async fn get_values_reconstruct_data(
305 635707 : &self,
306 635707 : keyspace: KeySpace,
307 635707 : lsn_range: Range<Lsn>,
308 635707 : reconstruct_data: &mut ValuesReconstructState,
309 635707 : ctx: &RequestContext,
310 635707 : ) -> Result<(), GetVectoredError> {
311 635707 : let layer = self
312 635707 : .0
313 635707 : .get_or_maybe_download(true, Some(ctx))
314 13 : .await
315 635707 : .map_err(|err| match err {
316 : DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
317 0 : GetVectoredError::Cancelled
318 : }
319 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
320 635707 : })?;
321 :
322 635707 : self.record_access(ctx);
323 635707 :
324 635707 : layer
325 635707 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
326 635707 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
327 285784 : .await
328 635707 : .map_err(|err| match err {
329 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
330 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
331 0 : ),
332 0 : err => err,
333 635707 : })
334 635707 : }
335 :
336 : /// Download the layer if evicted.
337 : ///
338 : /// Will not error when the layer is already downloaded.
339 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
340 0 : self.0.get_or_maybe_download(true, None).await?;
341 0 : Ok(())
342 0 : }
343 :
344 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
345 : /// while the guard exists.
346 : ///
347 : /// Returns None if the layer is currently evicted or becoming evicted.
348 : #[cfg(test)]
349 60 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
350 60 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
351 :
352 42 : Some(ResidentLayer {
353 42 : downloaded,
354 42 : owner: self.clone(),
355 42 : })
356 60 : }
357 :
358 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
359 : /// with `EvictionError::NotFound`.
360 : ///
361 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
362 : /// will be unless a read happens soon.
363 160 : pub(crate) fn is_likely_resident(&self) -> bool {
364 160 : self.0
365 160 : .inner
366 160 : .get()
367 160 : .map(|rowe| rowe.is_likely_resident())
368 160 : .unwrap_or(false)
369 160 : }
370 :
371 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
372 1488 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
373 1488 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
374 :
375 1488 : Ok(ResidentLayer {
376 1488 : downloaded,
377 1488 : owner: self.clone(),
378 1488 : })
379 1488 : }
380 :
381 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
382 0 : self.0.info(reset)
383 0 : }
384 :
385 24 : pub(crate) fn latest_activity(&self) -> SystemTime {
386 24 : self.0.access_stats.latest_activity()
387 24 : }
388 :
389 30 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
390 30 : self.0.access_stats.visibility()
391 30 : }
392 :
393 5130 : pub(crate) fn local_path(&self) -> &Utf8Path {
394 5130 : &self.0.path
395 5130 : }
396 :
397 5808 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
398 5808 : self.0.metadata()
399 5808 : }
400 :
401 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
402 0 : self.0
403 0 : .timeline
404 0 : .upgrade()
405 0 : .map(|timeline| timeline.timeline_id)
406 0 : }
407 :
408 : /// Traditional debug dumping facility
409 : #[allow(unused)]
410 12 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
411 12 : self.0.desc.dump();
412 12 :
413 12 : if verbose {
414 : // for now, unconditionally download everything, even if that might not be wanted.
415 12 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
416 24 : l.dump(&self.0, ctx).await?
417 0 : }
418 :
419 12 : Ok(())
420 12 : }
421 :
422 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
423 : /// deletion scheduling has completed).
424 : ///
425 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
426 : /// separatedly.
427 : #[cfg(any(feature = "testing", test))]
428 6 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
429 6 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
430 :
431 6 : async move {
432 : loop {
433 18 : if rx.changed().await.is_err() {
434 6 : break;
435 0 : }
436 : }
437 6 : }
438 6 : }
439 :
440 636913 : fn record_access(&self, ctx: &RequestContext) {
441 636913 : if self.0.access_stats.record_access(ctx) {
442 : // Visibility was modified to Visible
443 0 : tracing::info!(
444 0 : "Layer {} became visible as a result of access",
445 0 : self.0.desc.key()
446 : );
447 0 : if let Some(tl) = self.0.timeline.upgrade() {
448 0 : tl.metrics
449 0 : .visible_physical_size_gauge
450 0 : .add(self.0.desc.file_size)
451 0 : }
452 636913 : }
453 636913 : }
454 :
455 8082 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
456 8082 : let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
457 : use LayerVisibilityHint::*;
458 8082 : match (old_visibility, visibility) {
459 : (Visible, Covered) => {
460 : // Subtract this layer's contribution to the visible size metric
461 108 : if let Some(tl) = self.0.timeline.upgrade() {
462 108 : debug_assert!(
463 108 : tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
464 : );
465 108 : tl.metrics
466 108 : .visible_physical_size_gauge
467 108 : .sub(self.0.desc.file_size)
468 0 : }
469 : }
470 : (Covered, Visible) => {
471 : // Add this layer's contribution to the visible size metric
472 0 : if let Some(tl) = self.0.timeline.upgrade() {
473 0 : tl.metrics
474 0 : .visible_physical_size_gauge
475 0 : .add(self.0.desc.file_size)
476 0 : }
477 : }
478 7974 : (Covered, Covered) | (Visible, Visible) => {
479 7974 : // no change
480 7974 : }
481 : }
482 8082 : }
483 : }
484 :
485 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
486 : ///
487 : /// However when we want something evicted, we cannot evict it right away as there might be current
488 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
489 : /// read with [`Layer::get_values_reconstruct_data`].
490 : ///
491 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
492 : #[derive(Debug)]
493 : enum ResidentOrWantedEvicted {
494 : Resident(Arc<DownloadedLayer>),
495 : WantedEvicted(Weak<DownloadedLayer>, usize),
496 : }
497 :
498 : impl ResidentOrWantedEvicted {
499 : /// Non-mutating access to the a DownloadedLayer, if possible.
500 : ///
501 : /// This is not used on the read path (anything that calls
502 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
503 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
504 : #[cfg(test)]
505 42 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
506 42 : match self {
507 42 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
508 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
509 : }
510 42 : }
511 :
512 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
513 : /// reference from `ResidentOrWantedEvicted::get`.
514 142 : fn is_likely_resident(&self) -> bool {
515 142 : match self {
516 124 : ResidentOrWantedEvicted::Resident(_) => true,
517 18 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
518 : }
519 142 : }
520 :
521 : /// Upgrades any weak to strong if possible.
522 : ///
523 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
524 : /// happened.
525 637219 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
526 637219 : match self {
527 637195 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
528 24 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
529 0 : Some(strong) => {
530 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
531 0 :
532 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
533 0 :
534 0 : Some((strong, true))
535 : }
536 24 : None => None,
537 : },
538 : }
539 637219 : }
540 :
541 : /// When eviction is first requested, drop down to holding a [`Weak`].
542 : ///
543 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
544 : /// drop the possibly last strong reference outside of the mutex of
545 : /// [`heavier_once_cell::OnceCell`].
546 90 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
547 90 : match self {
548 78 : ResidentOrWantedEvicted::Resident(strong) => {
549 78 : let weak = Arc::downgrade(strong);
550 78 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
551 78 : std::mem::swap(self, &mut temp);
552 78 : match temp {
553 78 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
554 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
555 : }
556 : }
557 12 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
558 : }
559 90 : }
560 : }
561 :
562 : struct LayerInner {
563 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
564 : /// [`Self::path`].
565 : conf: &'static PageServerConf,
566 :
567 : /// Full path to the file; unclear if this should exist anymore.
568 : path: Utf8PathBuf,
569 :
570 : desc: PersistentLayerDesc,
571 :
572 : /// Timeline access is needed for remote timeline client and metrics.
573 : ///
574 : /// There should not be an access to timeline for any reason without entering the
575 : /// [`Timeline::gate`] at the same time.
576 : timeline: Weak<Timeline>,
577 :
578 : access_stats: LayerAccessStats,
579 :
580 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
581 : ///
582 : /// Filesystem changes (download, evict) are only done while holding a permit which the
583 : /// `heavier_once_cell` provides.
584 : ///
585 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
586 : /// possibly read while not holding it.
587 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
588 :
589 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
590 : wanted_deleted: AtomicBool,
591 :
592 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
593 : ///
594 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
595 : /// `ResidentOrWantedEvicted::WantedEvicted`.
596 : version: AtomicUsize,
597 :
598 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
599 : /// starts, or completes.
600 : ///
601 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
602 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
603 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
604 : /// [`ResidentOrWantedEvicted::Resident`] on access.
605 : ///
606 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
607 : status: Option<tokio::sync::watch::Sender<Status>>,
608 :
609 : /// Counter for exponential backoff with the download.
610 : ///
611 : /// This is atomic only for the purposes of having additional data only accessed while holding
612 : /// the InitPermit.
613 : consecutive_failures: AtomicUsize,
614 :
615 : /// The generation of this Layer.
616 : ///
617 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
618 : /// for created layers from [`Timeline::generation`].
619 : generation: Generation,
620 :
621 : /// The shard of this Layer.
622 : ///
623 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
624 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
625 : ///
626 : /// For loaded layers, this may be some other value if the tenant has undergone
627 : /// a shard split since the layer was originally written.
628 : shard: ShardIndex,
629 :
630 : /// When the Layer was last evicted but has not been downloaded since.
631 : ///
632 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
633 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
634 :
635 : #[cfg(test)]
636 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
637 : }
638 :
639 : impl std::fmt::Display for LayerInner {
640 90 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 90 : write!(f, "{}", self.layer_desc().short_id())
642 90 : }
643 : }
644 :
645 : impl AsLayerDesc for LayerInner {
646 1513610 : fn layer_desc(&self) -> &PersistentLayerDesc {
647 1513610 : &self.desc
648 1513610 : }
649 : }
650 :
651 : #[derive(Debug, Clone, Copy)]
652 : enum Status {
653 : Resident,
654 : Evicted,
655 : Downloading,
656 : }
657 :
658 : impl Drop for LayerInner {
659 1722 : fn drop(&mut self) {
660 1722 : // if there was a pending eviction, mark it cancelled here to balance metrics
661 1722 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
662 6 : {
663 6 : // eviction has already been started
664 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
665 6 :
666 6 : // eviction request is intentionally not honored as no one is present to wait for it
667 6 : // and we could be delaying shutdown for nothing.
668 1716 : }
669 :
670 1722 : if let Some(timeline) = self.timeline.upgrade() {
671 : // Only need to decrement metrics if the timeline still exists: otherwise
672 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
673 1674 : if self.desc.is_delta() {
674 1482 : timeline.metrics.layer_count_delta.dec();
675 1482 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
676 1482 : } else {
677 192 : timeline.metrics.layer_count_image.dec();
678 192 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
679 192 : }
680 :
681 1674 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
682 1656 : debug_assert!(
683 1656 : timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
684 : );
685 1656 : timeline
686 1656 : .metrics
687 1656 : .visible_physical_size_gauge
688 1656 : .sub(self.desc.file_size);
689 18 : }
690 48 : }
691 :
692 1722 : if !*self.wanted_deleted.get_mut() {
693 336 : return;
694 1386 : }
695 :
696 1386 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
697 :
698 1386 : let path = std::mem::take(&mut self.path);
699 1386 : let file_name = self.layer_desc().layer_name();
700 1386 : let file_size = self.layer_desc().file_size;
701 1386 : let timeline = self.timeline.clone();
702 1386 : let meta = self.metadata();
703 1386 : let status = self.status.take();
704 1386 :
705 1386 : Self::spawn_blocking(move || {
706 1383 : let _g = span.entered();
707 1383 :
708 1383 : // carry this until we are finished for [`Layer::wait_drop`] support
709 1383 : let _status = status;
710 :
711 1383 : let Some(timeline) = timeline.upgrade() else {
712 : // no need to nag that timeline is gone: under normal situation on
713 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
714 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
715 0 : return;
716 : };
717 :
718 1383 : let Ok(_guard) = timeline.gate.enter() else {
719 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
720 0 : return;
721 : };
722 :
723 1383 : let removed = match std::fs::remove_file(path) {
724 1377 : Ok(()) => true,
725 6 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
726 6 : // until we no longer do detaches by removing all local files before removing the
727 6 : // tenant from the global map, we will always get these errors even if we knew what
728 6 : // is the latest state.
729 6 : //
730 6 : // we currently do not track the latest state, so we'll also end up here on evicted
731 6 : // layers.
732 6 : false
733 : }
734 0 : Err(e) => {
735 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
736 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
737 0 : false
738 : }
739 : };
740 :
741 1383 : if removed {
742 1377 : timeline.metrics.resident_physical_size_sub(file_size);
743 1377 : }
744 1383 : let res = timeline
745 1383 : .remote_client
746 1383 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
747 :
748 1383 : if let Err(e) = res {
749 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
750 : // demonstrating this deadlock (without spawn_blocking): stop will drop
751 : // queued items, which will have ResidentLayer's, and those drops would try
752 : // to re-entrantly lock the RemoteTimelineClient inner state.
753 0 : if !timeline.is_active() {
754 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
755 : } else {
756 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
757 : }
758 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
759 1383 : } else {
760 1383 : LAYER_IMPL_METRICS.inc_completed_deletes();
761 1383 : }
762 1386 : });
763 1722 : }
764 : }
765 :
766 : impl LayerInner {
767 : #[allow(clippy::too_many_arguments)]
768 5196 : fn new(
769 5196 : conf: &'static PageServerConf,
770 5196 : timeline: &Arc<Timeline>,
771 5196 : local_path: Utf8PathBuf,
772 5196 : desc: PersistentLayerDesc,
773 5196 : downloaded: Option<Arc<DownloadedLayer>>,
774 5196 : generation: Generation,
775 5196 : shard: ShardIndex,
776 5196 : ) -> Self {
777 5196 : let (inner, version, init_status) = if let Some(inner) = downloaded {
778 5196 : let version = inner.version;
779 5196 : let resident = ResidentOrWantedEvicted::Resident(inner);
780 5196 : (
781 5196 : heavier_once_cell::OnceCell::new(resident),
782 5196 : version,
783 5196 : Status::Resident,
784 5196 : )
785 : } else {
786 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
787 : };
788 :
789 : // This object acts as a RAII guard on these metrics: increment on construction
790 5196 : if desc.is_delta() {
791 4266 : timeline.metrics.layer_count_delta.inc();
792 4266 : timeline.metrics.layer_size_delta.add(desc.file_size);
793 4266 : } else {
794 930 : timeline.metrics.layer_count_image.inc();
795 930 : timeline.metrics.layer_size_image.add(desc.file_size);
796 930 : }
797 :
798 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
799 5196 : timeline
800 5196 : .metrics
801 5196 : .visible_physical_size_gauge
802 5196 : .add(desc.file_size);
803 5196 :
804 5196 : LayerInner {
805 5196 : conf,
806 5196 : path: local_path,
807 5196 : desc,
808 5196 : timeline: Arc::downgrade(timeline),
809 5196 : access_stats: Default::default(),
810 5196 : wanted_deleted: AtomicBool::new(false),
811 5196 : inner,
812 5196 : version: AtomicUsize::new(version),
813 5196 : status: Some(tokio::sync::watch::channel(init_status).0),
814 5196 : consecutive_failures: AtomicUsize::new(0),
815 5196 : generation,
816 5196 : shard,
817 5196 : last_evicted_at: std::sync::Mutex::default(),
818 5196 : #[cfg(test)]
819 5196 : failpoints: Default::default(),
820 5196 : }
821 5196 : }
822 :
823 1410 : fn delete_on_drop(&self) {
824 1410 : let res =
825 1410 : self.wanted_deleted
826 1410 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
827 1410 :
828 1410 : if res.is_ok() {
829 1398 : LAYER_IMPL_METRICS.inc_started_deletes();
830 1398 : }
831 1410 : }
832 :
833 : /// Cancellation safe, however dropping the future and calling this method again might result
834 : /// in a new attempt to evict OR join the previously started attempt.
835 216 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
836 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
837 : let mut rx = self.status.as_ref().unwrap().subscribe();
838 :
839 : {
840 : let current = rx.borrow_and_update();
841 : match &*current {
842 : Status::Resident => {
843 : // we might get lucky and evict this; continue
844 : }
845 : Status::Evicted | Status::Downloading => {
846 : // it is already evicted
847 : return Err(EvictionError::NotFound);
848 : }
849 : }
850 : }
851 :
852 : let strong = {
853 : match self.inner.get() {
854 : Some(mut either) => either.downgrade(),
855 : None => {
856 : // we already have a scheduled eviction, which just has not gotten to run yet.
857 : // it might still race with a read access, but that could also get cancelled,
858 : // so let's say this is not evictable.
859 : return Err(EvictionError::NotFound);
860 : }
861 : }
862 : };
863 :
864 : if strong.is_some() {
865 : // drop the DownloadedLayer outside of the holding the guard
866 : drop(strong);
867 :
868 : // idea here is that only one evicter should ever get to witness a strong reference,
869 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
870 : // cancelled eviction and signal us, like it currently does.
871 : //
872 : // a second concurrent evict_and_wait will not see a strong reference.
873 : LAYER_IMPL_METRICS.inc_started_evictions();
874 : }
875 :
876 : let changed = rx.changed();
877 : let changed = tokio::time::timeout(timeout, changed).await;
878 :
879 : let Ok(changed) = changed else {
880 : return Err(EvictionError::Timeout);
881 : };
882 :
883 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
884 :
885 : let current = rx.borrow_and_update();
886 :
887 : match &*current {
888 : // the easiest case
889 : Status::Evicted => Ok(()),
890 : // it surely was evicted in between, but then there was a new access now; we can't know
891 : // if it'll succeed so lets just call it evicted
892 : Status::Downloading => Ok(()),
893 : // either the download which was started after eviction completed already, or it was
894 : // never evicted
895 : Status::Resident => Err(EvictionError::Downloaded),
896 : }
897 : }
898 :
899 : /// Cancellation safe.
900 637243 : async fn get_or_maybe_download(
901 637243 : self: &Arc<Self>,
902 637243 : allow_download: bool,
903 637243 : ctx: Option<&RequestContext>,
904 637243 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
905 48 : let (weak, permit) = {
906 : // get_or_init_detached can:
907 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
908 : // - be slow (wait for semaphore permit or closing)
909 637243 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
910 :
911 637243 : let locked = self
912 637243 : .inner
913 637243 : .get_or_init_detached()
914 10 : .await
915 637243 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
916 637243 :
917 637243 : scopeguard::ScopeGuard::into_inner(init_cancelled);
918 :
919 637195 : match locked {
920 : // this path could had been a RwLock::read
921 637195 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
922 0 : Ok(Ok((strong, _))) => {
923 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
924 0 : // previously a `evict_and_wait` was received. this is the only place when we
925 0 : // send out an update without holding the InitPermit.
926 0 : //
927 0 : // note that we also have dropped the Guard; this is fine, because we just made
928 0 : // a state change and are holding a strong reference to be returned.
929 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
930 0 : LAYER_IMPL_METRICS
931 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
932 0 :
933 0 : return Ok(strong);
934 : }
935 24 : Ok(Err(guard)) => {
936 24 : // path to here: we won the eviction, the file should still be on the disk.
937 24 : let (weak, permit) = guard.take_and_deinit();
938 24 : (Some(weak), permit)
939 : }
940 24 : Err(permit) => (None, permit),
941 : }
942 : };
943 :
944 48 : if let Some(weak) = weak {
945 : // only drop the weak after dropping the heavier_once_cell guard
946 24 : assert!(
947 24 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
948 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
949 : );
950 24 : }
951 :
952 48 : let timeline = self
953 48 : .timeline
954 48 : .upgrade()
955 48 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
956 :
957 : // count cancellations, which currently remain largely unexpected
958 48 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
959 :
960 : // check if we really need to be downloaded: this can happen if a read access won the
961 : // semaphore before eviction.
962 : //
963 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
964 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
965 48 : let needs_download = self
966 48 : .needs_download()
967 48 : .await
968 48 : .map_err(DownloadError::PreStatFailed);
969 48 :
970 48 : scopeguard::ScopeGuard::into_inner(init_cancelled);
971 :
972 48 : let needs_download = needs_download?;
973 :
974 48 : let Some(reason) = needs_download else {
975 : // the file is present locally because eviction has not had a chance to run yet
976 :
977 : #[cfg(test)]
978 24 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
979 6 : .await?;
980 :
981 18 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
982 18 :
983 18 : return Ok(self.initialize_after_layer_is_on_disk(permit));
984 : };
985 :
986 : // we must download; getting cancelled before spawning the download is not an issue as
987 : // any still running eviction would not find anything to evict.
988 :
989 24 : if let NeedsDownload::NotFile(ft) = reason {
990 0 : return Err(DownloadError::NotFile(ft));
991 24 : }
992 :
993 24 : if let Some(ctx) = ctx {
994 6 : self.check_expected_download(ctx)?;
995 18 : }
996 :
997 24 : if !allow_download {
998 : // this is only used from tests, but it is hard to test without the boolean
999 6 : return Err(DownloadError::DownloadRequired);
1000 18 : }
1001 18 :
1002 18 : let download_ctx = ctx
1003 18 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1004 18 : .unwrap_or(RequestContext::new(
1005 18 : TaskKind::LayerDownload,
1006 18 : DownloadBehavior::Download,
1007 18 : ));
1008 :
1009 18 : async move {
1010 18 : tracing::info!(%reason, "downloading on-demand");
1011 :
1012 18 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1013 18 : let res = self
1014 18 : .download_init_and_wait(timeline, permit, download_ctx)
1015 30 : .await?;
1016 18 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1017 18 : Ok(res)
1018 18 : }
1019 18 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1020 30 : .await
1021 637243 : }
1022 :
1023 : /// Nag or fail per RequestContext policy
1024 6 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1025 : use crate::context::DownloadBehavior::*;
1026 6 : let b = ctx.download_behavior();
1027 6 : match b {
1028 6 : Download => Ok(()),
1029 : Warn | Error => {
1030 0 : tracing::info!(
1031 0 : "unexpectedly on-demand downloading for task kind {:?}",
1032 0 : ctx.task_kind()
1033 : );
1034 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1035 :
1036 0 : let really_error =
1037 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1038 :
1039 0 : if really_error {
1040 : // this check is only probablistic, seems like flakyness footgun
1041 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1042 : } else {
1043 0 : Ok(())
1044 : }
1045 : }
1046 : }
1047 6 : }
1048 :
1049 : /// Actual download, at most one is executed at the time.
1050 18 : async fn download_init_and_wait(
1051 18 : self: &Arc<Self>,
1052 18 : timeline: Arc<Timeline>,
1053 18 : permit: heavier_once_cell::InitPermit,
1054 18 : ctx: RequestContext,
1055 18 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1056 18 : debug_assert_current_span_has_tenant_and_timeline_id();
1057 18 :
1058 18 : let (tx, rx) = tokio::sync::oneshot::channel();
1059 18 :
1060 18 : let this: Arc<Self> = self.clone();
1061 :
1062 18 : let guard = timeline
1063 18 : .gate
1064 18 : .enter()
1065 18 : .map_err(|_| DownloadError::DownloadCancelled)?;
1066 :
1067 18 : Self::spawn(
1068 18 : async move {
1069 0 : let _guard = guard;
1070 0 :
1071 0 : // now that we have commited to downloading, send out an update to:
1072 0 : // - unhang any pending eviction
1073 0 : // - break out of evict_and_wait
1074 0 : this.status
1075 0 : .as_ref()
1076 0 : .unwrap()
1077 0 : .send_replace(Status::Downloading);
1078 18 :
1079 18 : #[cfg(test)]
1080 18 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1081 6 : .await
1082 18 : .unwrap();
1083 :
1084 292 : let res = this.download_and_init(timeline, permit, &ctx).await;
1085 :
1086 18 : if let Err(res) = tx.send(res) {
1087 0 : match res {
1088 0 : Ok(_res) => {
1089 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1090 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1091 : }
1092 0 : Err(e) => {
1093 0 : tracing::info!(
1094 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1095 : );
1096 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1097 : }
1098 : }
1099 18 : }
1100 18 : }
1101 18 : .in_current_span(),
1102 18 : );
1103 18 :
1104 30 : match rx.await {
1105 18 : Ok(Ok(res)) => Ok(res),
1106 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1107 0 : Err(DownloadError::DownloadCancelled)
1108 : }
1109 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1110 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1111 : }
1112 18 : }
1113 :
1114 18 : async fn download_and_init(
1115 18 : self: &Arc<LayerInner>,
1116 18 : timeline: Arc<Timeline>,
1117 18 : permit: heavier_once_cell::InitPermit,
1118 18 : ctx: &RequestContext,
1119 18 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1120 18 : let result = timeline
1121 18 : .remote_client
1122 18 : .download_layer_file(
1123 18 : &self.desc.layer_name(),
1124 18 : &self.metadata(),
1125 18 : &self.path,
1126 18 : &timeline.cancel,
1127 18 : ctx,
1128 18 : )
1129 275 : .await;
1130 :
1131 18 : match result {
1132 18 : Ok(size) => {
1133 18 : assert_eq!(size, self.desc.file_size);
1134 :
1135 18 : match self.needs_download().await {
1136 0 : Ok(Some(reason)) => {
1137 0 : // this is really a bug in needs_download or remote timeline client
1138 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1139 : }
1140 18 : Ok(None) => {
1141 18 : // as expected
1142 18 : }
1143 0 : Err(e) => {
1144 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1145 : }
1146 : }
1147 :
1148 18 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1149 18 : timeline
1150 18 : .metrics
1151 18 : .resident_physical_size_add(self.desc.file_size);
1152 18 : self.consecutive_failures.store(0, Ordering::Relaxed);
1153 18 :
1154 18 : let since_last_eviction = self
1155 18 : .last_evicted_at
1156 18 : .lock()
1157 18 : .unwrap()
1158 18 : .take()
1159 18 : .map(|ts| ts.elapsed());
1160 18 : if let Some(since_last_eviction) = since_last_eviction {
1161 18 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1162 18 : }
1163 :
1164 18 : self.access_stats.record_residence_event();
1165 18 :
1166 18 : Ok(self.initialize_after_layer_is_on_disk(permit))
1167 : }
1168 0 : Err(e) => {
1169 0 : let consecutive_failures =
1170 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1171 0 :
1172 0 : if timeline.cancel.is_cancelled() {
1173 : // If we're shutting down, drop out before logging the error
1174 0 : return Err(e);
1175 0 : }
1176 0 :
1177 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1178 :
1179 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1180 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1181 0 : 1.5,
1182 0 : 60.0,
1183 0 : );
1184 0 :
1185 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1186 0 :
1187 0 : tokio::select! {
1188 0 : _ = tokio::time::sleep(backoff) => {},
1189 0 : _ = timeline.cancel.cancelled() => {},
1190 : };
1191 :
1192 0 : Err(e)
1193 : }
1194 : }
1195 18 : }
1196 :
1197 : /// Initializes the `Self::inner` to a "resident" state.
1198 : ///
1199 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1200 : /// before calling this method.
1201 : ///
1202 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1203 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1204 36 : fn initialize_after_layer_is_on_disk(
1205 36 : self: &Arc<LayerInner>,
1206 36 : permit: heavier_once_cell::InitPermit,
1207 36 : ) -> Arc<DownloadedLayer> {
1208 36 : debug_assert_current_span_has_tenant_and_timeline_id();
1209 36 :
1210 36 : // disable any scheduled but not yet running eviction deletions for this initialization
1211 36 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1212 36 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1213 36 :
1214 36 : let res = Arc::new(DownloadedLayer {
1215 36 : owner: Arc::downgrade(self),
1216 36 : kind: tokio::sync::OnceCell::default(),
1217 36 : version: next_version,
1218 36 : });
1219 36 :
1220 36 : let waiters = self.inner.initializer_count();
1221 36 : if waiters > 0 {
1222 0 : tracing::info!(waiters, "completing layer init for other tasks");
1223 36 : }
1224 :
1225 36 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1226 36 :
1227 36 : self.inner.set(value, permit);
1228 36 :
1229 36 : res
1230 36 : }
1231 :
1232 72 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1233 72 : match tokio::fs::metadata(&self.path).await {
1234 48 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1235 24 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1236 0 : Err(e) => Err(e),
1237 : }
1238 72 : }
1239 :
1240 72 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1241 72 : match self.path.metadata() {
1242 72 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1243 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1244 0 : Err(e) => Err(e),
1245 : }
1246 72 : }
1247 :
1248 120 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1249 120 : // in future, this should include sha2-256 validation of the file.
1250 120 : if !m.is_file() {
1251 0 : Err(NeedsDownload::NotFile(m.file_type()))
1252 120 : } else if m.len() != self.desc.file_size {
1253 0 : Err(NeedsDownload::WrongSize {
1254 0 : actual: m.len(),
1255 0 : expected: self.desc.file_size,
1256 0 : })
1257 : } else {
1258 120 : Ok(())
1259 : }
1260 120 : }
1261 :
1262 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1263 0 : let layer_name = self.desc.layer_name().to_string();
1264 0 :
1265 0 : let resident = self
1266 0 : .inner
1267 0 : .get()
1268 0 : .map(|rowe| rowe.is_likely_resident())
1269 0 : .unwrap_or(false);
1270 0 :
1271 0 : let access_stats = self.access_stats.as_api_model(reset);
1272 0 :
1273 0 : if self.desc.is_delta {
1274 0 : let lsn_range = &self.desc.lsn_range;
1275 0 :
1276 0 : HistoricLayerInfo::Delta {
1277 0 : layer_file_name: layer_name,
1278 0 : layer_file_size: self.desc.file_size,
1279 0 : lsn_start: lsn_range.start,
1280 0 : lsn_end: lsn_range.end,
1281 0 : remote: !resident,
1282 0 : access_stats,
1283 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(
1284 0 : &self.layer_desc().key_range,
1285 0 : self.layer_desc().is_delta,
1286 0 : ),
1287 0 : }
1288 : } else {
1289 0 : let lsn = self.desc.image_layer_lsn();
1290 0 :
1291 0 : HistoricLayerInfo::Image {
1292 0 : layer_file_name: layer_name,
1293 0 : layer_file_size: self.desc.file_size,
1294 0 : lsn_start: lsn,
1295 0 : remote: !resident,
1296 0 : access_stats,
1297 0 : }
1298 : }
1299 0 : }
1300 :
1301 : /// `DownloadedLayer` is being dropped, so it calls this method.
1302 72 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1303 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1304 : // though here it is very likely
1305 72 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1306 :
1307 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1308 : // drop while the `self.inner` is being locked, leading to a deadlock.
1309 :
1310 72 : let start_evicting = async move {
1311 72 : #[cfg(test)]
1312 72 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1313 42 : .await
1314 72 : .expect("failpoint should not have errored");
1315 72 :
1316 72 : tracing::debug!("eviction started");
1317 :
1318 72 : let res = self.wait_for_turn_and_evict(only_version).await;
1319 : // metrics: ignore the Ok branch, it is not done yet
1320 72 : if let Err(e) = res {
1321 18 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1322 18 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1323 54 : }
1324 72 : };
1325 :
1326 72 : Self::spawn(start_evicting.instrument(span));
1327 72 : }
1328 :
1329 72 : async fn wait_for_turn_and_evict(
1330 72 : self: Arc<LayerInner>,
1331 72 : only_version: usize,
1332 72 : ) -> Result<(), EvictionCancelled> {
1333 138 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1334 : use Status::*;
1335 138 : match status {
1336 132 : Resident => Ok(()),
1337 6 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1338 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1339 : }
1340 138 : }
1341 :
1342 72 : let timeline = self
1343 72 : .timeline
1344 72 : .upgrade()
1345 72 : .ok_or(EvictionCancelled::TimelineGone)?;
1346 :
1347 72 : let mut rx = self
1348 72 : .status
1349 72 : .as_ref()
1350 72 : .expect("LayerInner cannot be dropped, holding strong ref")
1351 72 : .subscribe();
1352 72 :
1353 72 : is_good_to_continue(&rx.borrow_and_update())?;
1354 :
1355 66 : let Ok(gate) = timeline.gate.enter() else {
1356 0 : return Err(EvictionCancelled::TimelineGone);
1357 : };
1358 :
1359 54 : let permit = {
1360 : // we cannot just `std::fs::remove_file` because there might already be an
1361 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1362 : // operations must be done while holding the heavier_once_cell::InitPermit
1363 66 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1364 :
1365 66 : let waited = loop {
1366 : // we must race to the Downloading starting, otherwise we would have to wait until the
1367 : // completion of the download. waiting for download could be long and hinder our
1368 : // efforts to alert on "hanging" evictions.
1369 66 : tokio::select! {
1370 66 : res = &mut wait => break res,
1371 66 : _ = rx.changed() => {
1372 0 : is_good_to_continue(&rx.borrow_and_update())?;
1373 : // two possibilities for Status::Resident:
1374 : // - the layer was found locally from disk by a read
1375 : // - we missed a bunch of updates and now the layer is
1376 : // again downloaded -- assume we'll fail later on with
1377 : // version check or AlreadyReinitialized
1378 : }
1379 : }
1380 : };
1381 :
1382 : // re-check now that we have the guard or permit; all updates should have happened
1383 : // while holding the permit.
1384 66 : is_good_to_continue(&rx.borrow_and_update())?;
1385 :
1386 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1387 : // lead to deallocating the reference counted value, and the value we
1388 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1389 66 : let (_weak, permit) = match waited {
1390 60 : Ok(guard) => {
1391 60 : match &*guard {
1392 54 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1393 54 : if *version == only_version =>
1394 48 : {
1395 48 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1396 48 : let (weak, permit) = guard.take_and_deinit();
1397 48 : (Some(weak), permit)
1398 : }
1399 6 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1400 6 : // if we were not doing the version check, we would need to try to
1401 6 : // upgrade the weak here to see if it really is dropped. version check
1402 6 : // is done instead assuming that it is cheaper.
1403 6 : tracing::debug!(
1404 : version,
1405 : only_version,
1406 0 : "version mismatch, not deinitializing"
1407 : );
1408 6 : return Err(EvictionCancelled::VersionCheckFailed);
1409 : }
1410 : ResidentOrWantedEvicted::Resident(_) => {
1411 6 : return Err(EvictionCancelled::AlreadyReinitialized);
1412 : }
1413 : }
1414 : }
1415 6 : Err(permit) => {
1416 6 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1417 6 : (None, permit)
1418 : }
1419 : };
1420 :
1421 54 : permit
1422 54 : };
1423 54 :
1424 54 : let span = tracing::Span::current();
1425 54 :
1426 54 : let spawned_at = std::time::Instant::now();
1427 54 :
1428 54 : // this is on purpose a detached spawn; we don't need to wait for it
1429 54 : //
1430 54 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1431 54 : // well from a spawn_blocking thread.
1432 54 : //
1433 54 : // important to note that now that we've acquired the permit we have made sure the evicted
1434 54 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1435 54 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1436 54 : // evicting.
1437 54 : //
1438 54 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1439 54 : // reads. We will need to add cancellation for that if necessary.
1440 54 : Self::spawn_blocking(move || {
1441 54 : let _span = span.entered();
1442 54 :
1443 54 : let res = self.evict_blocking(&timeline, &gate, &permit);
1444 54 :
1445 54 : let waiters = self.inner.initializer_count();
1446 54 :
1447 54 : if waiters > 0 {
1448 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1449 54 : }
1450 :
1451 54 : let completed_in = spawned_at.elapsed();
1452 54 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1453 54 :
1454 54 : match res {
1455 54 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1456 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1457 : }
1458 :
1459 54 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1460 54 : });
1461 54 :
1462 54 : Ok(())
1463 72 : }
1464 :
1465 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1466 54 : fn evict_blocking(
1467 54 : &self,
1468 54 : timeline: &Timeline,
1469 54 : _gate: &gate::GateGuard,
1470 54 : _permit: &heavier_once_cell::InitPermit,
1471 54 : ) -> Result<(), EvictionCancelled> {
1472 54 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1473 54 :
1474 54 : match capture_mtime_and_remove(&self.path) {
1475 54 : Ok(local_layer_mtime) => {
1476 54 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1477 54 : match duration {
1478 54 : Ok(elapsed) => {
1479 54 : let accessed_and_visible = self.access_stats.accessed()
1480 12 : && self.access_stats.visibility() == LayerVisibilityHint::Visible;
1481 54 : if accessed_and_visible {
1482 12 : // Only layers used for reads contribute to our "low residence" metric that is used
1483 12 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1484 12 : // to be rapidly evicted without contributing to this metric.
1485 12 : timeline
1486 12 : .metrics
1487 12 : .evictions_with_low_residence_duration
1488 12 : .read()
1489 12 : .unwrap()
1490 12 : .observe(elapsed);
1491 42 : }
1492 :
1493 54 : tracing::info!(
1494 0 : residence_millis = elapsed.as_millis(),
1495 0 : accessed_and_visible,
1496 0 : "evicted layer after known residence period"
1497 : );
1498 : }
1499 : Err(_) => {
1500 0 : tracing::info!("evicted layer after unknown residence period");
1501 : }
1502 : }
1503 54 : timeline.metrics.evictions.inc();
1504 54 : timeline
1505 54 : .metrics
1506 54 : .resident_physical_size_sub(self.desc.file_size);
1507 : }
1508 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1509 0 : tracing::error!(
1510 : layer_size = %self.desc.file_size,
1511 0 : "failed to evict layer from disk, it was already gone"
1512 : );
1513 0 : return Err(EvictionCancelled::FileNotFound);
1514 : }
1515 0 : Err(e) => {
1516 0 : // FIXME: this should probably be an abort
1517 0 : tracing::error!("failed to evict file from disk: {e:#}");
1518 0 : return Err(EvictionCancelled::RemoveFailed);
1519 : }
1520 : }
1521 :
1522 54 : self.access_stats.record_residence_event();
1523 54 :
1524 54 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1525 54 :
1526 54 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1527 54 :
1528 54 : Ok(())
1529 54 : }
1530 :
1531 7212 : fn metadata(&self) -> LayerFileMetadata {
1532 7212 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1533 7212 : }
1534 :
1535 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1536 : ///
1537 : /// Synchronizing with spawned tasks is very complicated otherwise.
1538 90 : fn spawn<F>(fut: F)
1539 90 : where
1540 90 : F: std::future::Future<Output = ()> + Send + 'static,
1541 90 : {
1542 90 : #[cfg(test)]
1543 90 : tokio::task::spawn(fut);
1544 90 : #[cfg(not(test))]
1545 90 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1546 90 : }
1547 :
1548 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1549 1440 : fn spawn_blocking<F>(f: F)
1550 1440 : where
1551 1440 : F: FnOnce() + Send + 'static,
1552 1440 : {
1553 1440 : #[cfg(test)]
1554 1440 : tokio::task::spawn_blocking(f);
1555 1440 : #[cfg(not(test))]
1556 1440 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1557 1440 : }
1558 : }
1559 :
1560 54 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1561 54 : let m = path.metadata()?;
1562 54 : let local_layer_mtime = m.modified()?;
1563 54 : std::fs::remove_file(path)?;
1564 54 : Ok(local_layer_mtime)
1565 54 : }
1566 :
1567 0 : #[derive(Debug, thiserror::Error)]
1568 : pub(crate) enum EvictionError {
1569 : #[error("layer was already evicted")]
1570 : NotFound,
1571 :
1572 : /// Evictions must always lose to downloads in races, and this time it happened.
1573 : #[error("layer was downloaded instead")]
1574 : Downloaded,
1575 :
1576 : #[error("eviction did not happen within timeout")]
1577 : Timeout,
1578 : }
1579 :
1580 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1581 0 : #[derive(Debug, thiserror::Error)]
1582 : pub(crate) enum DownloadError {
1583 : #[error("timeline has already shutdown")]
1584 : TimelineShutdown,
1585 : #[error("context denies downloading")]
1586 : ContextAndConfigReallyDeniesDownloads,
1587 : #[error("downloading is really required but not allowed by this method")]
1588 : DownloadRequired,
1589 : #[error("layer path exists, but it is not a file: {0:?}")]
1590 : NotFile(std::fs::FileType),
1591 : /// Why no error here? Because it will be reported by page_service. We should had also done
1592 : /// retries already.
1593 : #[error("downloading evicted layer file failed")]
1594 : DownloadFailed,
1595 : #[error("downloading failed, possibly for shutdown")]
1596 : DownloadCancelled,
1597 : #[error("pre-condition: stat before download failed")]
1598 : PreStatFailed(#[source] std::io::Error),
1599 :
1600 : #[cfg(test)]
1601 : #[error("failpoint: {0:?}")]
1602 : Failpoint(failpoints::FailpointKind),
1603 : }
1604 :
1605 : impl DownloadError {
1606 0 : pub(crate) fn is_cancelled(&self) -> bool {
1607 0 : matches!(self, DownloadError::DownloadCancelled)
1608 0 : }
1609 : }
1610 :
1611 : #[derive(Debug, PartialEq)]
1612 : pub(crate) enum NeedsDownload {
1613 : NotFound,
1614 : NotFile(std::fs::FileType),
1615 : WrongSize { actual: u64, expected: u64 },
1616 : }
1617 :
1618 : impl std::fmt::Display for NeedsDownload {
1619 18 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1620 18 : match self {
1621 18 : NeedsDownload::NotFound => write!(f, "file was not found"),
1622 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1623 0 : NeedsDownload::WrongSize { actual, expected } => {
1624 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1625 : }
1626 : }
1627 18 : }
1628 : }
1629 :
1630 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1631 : pub(crate) struct DownloadedLayer {
1632 : owner: Weak<LayerInner>,
1633 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1634 : // DownloadedLayer
1635 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1636 : version: usize,
1637 : }
1638 :
1639 : impl std::fmt::Debug for DownloadedLayer {
1640 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1641 0 : f.debug_struct("DownloadedLayer")
1642 0 : // owner omitted because it is always "Weak"
1643 0 : .field("kind", &self.kind)
1644 0 : .field("version", &self.version)
1645 0 : .finish()
1646 0 : }
1647 : }
1648 :
1649 : impl Drop for DownloadedLayer {
1650 1788 : fn drop(&mut self) {
1651 1788 : if let Some(owner) = self.owner.upgrade() {
1652 72 : owner.on_downloaded_layer_drop(self.version);
1653 1716 : } else {
1654 1716 : // Layer::drop will handle cancelling the eviction; because of drop order and
1655 1716 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1656 1716 : }
1657 1788 : }
1658 : }
1659 :
1660 : impl DownloadedLayer {
1661 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1662 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1663 : /// the initial load failure immediately.
1664 : ///
1665 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1666 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1667 : /// we will always have the LayerInner on the callstack, so we can just use it.
1668 638617 : async fn get<'a>(
1669 638617 : &'a self,
1670 638617 : owner: &Arc<LayerInner>,
1671 638617 : ctx: &RequestContext,
1672 638617 : ) -> anyhow::Result<&'a LayerKind> {
1673 638617 : let init = || async {
1674 3450 : assert_eq!(
1675 3450 : Weak::as_ptr(&self.owner),
1676 3450 : Arc::as_ptr(owner),
1677 0 : "these are the same, just avoiding the upgrade"
1678 : );
1679 :
1680 3450 : let res = if owner.desc.is_delta {
1681 3132 : let ctx = RequestContextBuilder::extend(ctx)
1682 3132 : .page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
1683 3132 : .build();
1684 3132 : let summary = Some(delta_layer::Summary::expected(
1685 3132 : owner.desc.tenant_shard_id.tenant_id,
1686 3132 : owner.desc.timeline_id,
1687 3132 : owner.desc.key_range.clone(),
1688 3132 : owner.desc.lsn_range.clone(),
1689 3132 : ));
1690 3132 : delta_layer::DeltaLayerInner::load(
1691 3132 : &owner.path,
1692 3132 : summary,
1693 3132 : Some(owner.conf.max_vectored_read_bytes),
1694 3132 : &ctx,
1695 3132 : )
1696 3145 : .await
1697 3132 : .map(LayerKind::Delta)
1698 : } else {
1699 318 : let ctx = RequestContextBuilder::extend(ctx)
1700 318 : .page_content_kind(crate::context::PageContentKind::ImageLayerSummary)
1701 318 : .build();
1702 318 : let lsn = owner.desc.image_layer_lsn();
1703 318 : let summary = Some(image_layer::Summary::expected(
1704 318 : owner.desc.tenant_shard_id.tenant_id,
1705 318 : owner.desc.timeline_id,
1706 318 : owner.desc.key_range.clone(),
1707 318 : lsn,
1708 318 : ));
1709 318 : image_layer::ImageLayerInner::load(
1710 318 : &owner.path,
1711 318 : lsn,
1712 318 : summary,
1713 318 : Some(owner.conf.max_vectored_read_bytes),
1714 318 : &ctx,
1715 318 : )
1716 322 : .await
1717 318 : .map(LayerKind::Image)
1718 : };
1719 :
1720 3450 : match res {
1721 3450 : Ok(layer) => Ok(layer),
1722 0 : Err(err) => {
1723 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1724 0 : // We log this message once over the lifetime of `Self`
1725 0 : // => Ok and good to log backtrace and path here.
1726 0 : tracing::error!(
1727 0 : "layer load failed, assuming permanent failure: {}: {err:?}",
1728 0 : owner.path
1729 : );
1730 0 : Err(err)
1731 : }
1732 : }
1733 6900 : };
1734 638617 : self.kind
1735 638617 : .get_or_init(init)
1736 3467 : .await
1737 638617 : .as_ref()
1738 638617 : // We already logged the full backtrace above, once. Don't repeat that here.
1739 638617 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1740 638617 : }
1741 :
1742 635707 : async fn get_values_reconstruct_data(
1743 635707 : &self,
1744 635707 : keyspace: KeySpace,
1745 635707 : lsn_range: Range<Lsn>,
1746 635707 : reconstruct_data: &mut ValuesReconstructState,
1747 635707 : owner: &Arc<LayerInner>,
1748 635707 : ctx: &RequestContext,
1749 635707 : ) -> Result<(), GetVectoredError> {
1750 : use LayerKind::*;
1751 :
1752 635707 : match self
1753 635707 : .get(owner, ctx)
1754 2591 : .await
1755 635707 : .map_err(GetVectoredError::Other)?
1756 : {
1757 611508 : Delta(d) => {
1758 611508 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1759 266813 : .await
1760 : }
1761 24199 : Image(i) => {
1762 24199 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1763 16380 : .await
1764 : }
1765 : }
1766 635707 : }
1767 :
1768 12 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1769 : use LayerKind::*;
1770 12 : match self.get(owner, ctx).await? {
1771 12 : Delta(d) => d.dump(ctx).await?,
1772 0 : Image(i) => i.dump(ctx).await?,
1773 : }
1774 :
1775 12 : Ok(())
1776 12 : }
1777 : }
1778 :
1779 : /// Wrapper around an actual layer implementation.
1780 : #[derive(Debug)]
1781 : enum LayerKind {
1782 : Delta(delta_layer::DeltaLayerInner),
1783 : Image(image_layer::ImageLayerInner),
1784 : }
1785 :
1786 : /// Guard for forcing a layer be resident while it exists.
1787 : #[derive(Clone)]
1788 : pub(crate) struct ResidentLayer {
1789 : owner: Layer,
1790 : downloaded: Arc<DownloadedLayer>,
1791 : }
1792 :
1793 : impl std::fmt::Display for ResidentLayer {
1794 5760 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1795 5760 : write!(f, "{}", self.owner)
1796 5760 : }
1797 : }
1798 :
1799 : impl std::fmt::Debug for ResidentLayer {
1800 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1801 0 : write!(f, "{}", self.owner)
1802 0 : }
1803 : }
1804 :
1805 : impl ResidentLayer {
1806 : /// Release the eviction guard, converting back into a plain [`Layer`].
1807 : ///
1808 : /// You can access the [`Layer`] also by using `as_ref`.
1809 1260 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1810 1260 : self.into()
1811 1260 : }
1812 :
1813 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1814 2412 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1815 : pub(crate) async fn load_keys<'a>(
1816 : &'a self,
1817 : ctx: &RequestContext,
1818 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1819 : use LayerKind::*;
1820 :
1821 : let owner = &self.owner.0;
1822 : match self.downloaded.get(owner, ctx).await? {
1823 : Delta(ref d) => {
1824 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1825 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1826 : // while it's being held.
1827 : self.owner.record_access(ctx);
1828 :
1829 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1830 : .await
1831 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1832 : }
1833 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1834 : }
1835 : }
1836 :
1837 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1838 : /// the provided writer. Return the number of keys written.
1839 48 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1840 : pub(crate) async fn filter(
1841 : &self,
1842 : shard_identity: &ShardIdentity,
1843 : writer: &mut ImageLayerWriter,
1844 : ctx: &RequestContext,
1845 : ) -> Result<usize, CompactionError> {
1846 : use LayerKind::*;
1847 :
1848 : match self
1849 : .downloaded
1850 : .get(&self.owner.0, ctx)
1851 : .await
1852 : .map_err(CompactionError::Other)?
1853 : {
1854 : Delta(_) => {
1855 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1856 : "cannot filter() on a delta layer {self}"
1857 : ))));
1858 : }
1859 : Image(i) => i
1860 : .filter(shard_identity, writer, ctx)
1861 : .await
1862 : .map_err(CompactionError::Other),
1863 : }
1864 : }
1865 :
1866 : /// Returns the amount of keys and values written to the writer.
1867 30 : pub(crate) async fn copy_delta_prefix(
1868 30 : &self,
1869 30 : writer: &mut super::delta_layer::DeltaLayerWriter,
1870 30 : until: Lsn,
1871 30 : ctx: &RequestContext,
1872 30 : ) -> anyhow::Result<usize> {
1873 : use LayerKind::*;
1874 :
1875 30 : let owner = &self.owner.0;
1876 30 :
1877 30 : match self.downloaded.get(owner, ctx).await? {
1878 30 : Delta(ref d) => d
1879 30 : .copy_prefix(writer, until, ctx)
1880 39 : .await
1881 30 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1882 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1883 : }
1884 30 : }
1885 :
1886 5071 : pub(crate) fn local_path(&self) -> &Utf8Path {
1887 5071 : &self.owner.0.path
1888 5071 : }
1889 :
1890 4614 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1891 4614 : self.owner.metadata()
1892 4614 : }
1893 :
1894 : /// Cast the layer to a delta, return an error if it is an image layer.
1895 1524 : pub(crate) async fn get_as_delta(
1896 1524 : &self,
1897 1524 : ctx: &RequestContext,
1898 1524 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1899 : use LayerKind::*;
1900 1524 : match self.downloaded.get(&self.owner.0, ctx).await? {
1901 1524 : Delta(ref d) => Ok(d),
1902 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1903 : }
1904 1524 : }
1905 :
1906 : /// Cast the layer to an image, return an error if it is a delta layer.
1907 114 : pub(crate) async fn get_as_image(
1908 114 : &self,
1909 114 : ctx: &RequestContext,
1910 114 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1911 : use LayerKind::*;
1912 114 : match self.downloaded.get(&self.owner.0, ctx).await? {
1913 114 : Image(ref d) => Ok(d),
1914 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1915 : }
1916 114 : }
1917 : }
1918 :
1919 : impl AsLayerDesc for ResidentLayer {
1920 17791 : fn layer_desc(&self) -> &PersistentLayerDesc {
1921 17791 : self.owner.layer_desc()
1922 17791 : }
1923 : }
1924 :
1925 : impl AsRef<Layer> for ResidentLayer {
1926 5784 : fn as_ref(&self) -> &Layer {
1927 5784 : &self.owner
1928 5784 : }
1929 : }
1930 :
1931 : /// Drop the eviction guard.
1932 : impl From<ResidentLayer> for Layer {
1933 1260 : fn from(value: ResidentLayer) -> Self {
1934 1260 : value.owner
1935 1260 : }
1936 : }
1937 :
1938 : use metrics::IntCounter;
1939 :
1940 : pub(crate) struct LayerImplMetrics {
1941 : started_evictions: IntCounter,
1942 : completed_evictions: IntCounter,
1943 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1944 :
1945 : started_deletes: IntCounter,
1946 : completed_deletes: IntCounter,
1947 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1948 :
1949 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1950 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1951 : redownload_after: metrics::Histogram,
1952 : time_to_evict: metrics::Histogram,
1953 : }
1954 :
1955 : impl Default for LayerImplMetrics {
1956 126 : fn default() -> Self {
1957 : use enum_map::Enum;
1958 :
1959 : // reminder: these will be pageserver_layer_* with "_total" suffix
1960 :
1961 126 : let started_evictions = metrics::register_int_counter!(
1962 126 : "pageserver_layer_started_evictions",
1963 126 : "Evictions started in the Layer implementation"
1964 126 : )
1965 126 : .unwrap();
1966 126 : let completed_evictions = metrics::register_int_counter!(
1967 126 : "pageserver_layer_completed_evictions",
1968 126 : "Evictions completed in the Layer implementation"
1969 126 : )
1970 126 : .unwrap();
1971 126 :
1972 126 : let cancelled_evictions = metrics::register_int_counter_vec!(
1973 126 : "pageserver_layer_cancelled_evictions_count",
1974 126 : "Different reasons for evictions to have been cancelled or failed",
1975 126 : &["reason"]
1976 126 : )
1977 126 : .unwrap();
1978 126 :
1979 1134 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1980 1134 : let reason = EvictionCancelled::from_usize(i);
1981 1134 : let s = reason.as_str();
1982 1134 : cancelled_evictions.with_label_values(&[s])
1983 1134 : }));
1984 126 :
1985 126 : let started_deletes = metrics::register_int_counter!(
1986 126 : "pageserver_layer_started_deletes",
1987 126 : "Deletions on drop pending in the Layer implementation"
1988 126 : )
1989 126 : .unwrap();
1990 126 : let completed_deletes = metrics::register_int_counter!(
1991 126 : "pageserver_layer_completed_deletes",
1992 126 : "Deletions on drop completed in the Layer implementation"
1993 126 : )
1994 126 : .unwrap();
1995 126 :
1996 126 : let failed_deletes = metrics::register_int_counter_vec!(
1997 126 : "pageserver_layer_failed_deletes_count",
1998 126 : "Different reasons for deletions on drop to have failed",
1999 126 : &["reason"]
2000 126 : )
2001 126 : .unwrap();
2002 126 :
2003 252 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2004 252 : let reason = DeleteFailed::from_usize(i);
2005 252 : let s = reason.as_str();
2006 252 : failed_deletes.with_label_values(&[s])
2007 252 : }));
2008 126 :
2009 126 : let rare_counters = metrics::register_int_counter_vec!(
2010 126 : "pageserver_layer_assumed_rare_count",
2011 126 : "Times unexpected or assumed rare event happened",
2012 126 : &["event"]
2013 126 : )
2014 126 : .unwrap();
2015 126 :
2016 882 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2017 882 : let event = RareEvent::from_usize(i);
2018 882 : let s = event.as_str();
2019 882 : rare_counters.with_label_values(&[s])
2020 882 : }));
2021 126 :
2022 126 : let inits_cancelled = metrics::register_int_counter!(
2023 126 : "pageserver_layer_inits_cancelled_count",
2024 126 : "Times Layer initialization was cancelled",
2025 126 : )
2026 126 : .unwrap();
2027 126 :
2028 126 : let redownload_after = {
2029 126 : let minute = 60.0;
2030 126 : let hour = 60.0 * minute;
2031 126 : metrics::register_histogram!(
2032 126 : "pageserver_layer_redownloaded_after",
2033 126 : "Time between evicting and re-downloading.",
2034 126 : vec![
2035 126 : 10.0,
2036 126 : 30.0,
2037 126 : minute,
2038 126 : 5.0 * minute,
2039 126 : 15.0 * minute,
2040 126 : 30.0 * minute,
2041 126 : hour,
2042 126 : 12.0 * hour,
2043 126 : ]
2044 126 : )
2045 126 : .unwrap()
2046 126 : };
2047 126 :
2048 126 : let time_to_evict = metrics::register_histogram!(
2049 126 : "pageserver_layer_eviction_held_permit_seconds",
2050 126 : "Time eviction held the permit.",
2051 126 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2052 126 : )
2053 126 : .unwrap();
2054 126 :
2055 126 : Self {
2056 126 : started_evictions,
2057 126 : completed_evictions,
2058 126 : cancelled_evictions,
2059 126 :
2060 126 : started_deletes,
2061 126 : completed_deletes,
2062 126 : failed_deletes,
2063 126 :
2064 126 : rare_counters,
2065 126 : inits_cancelled,
2066 126 : redownload_after,
2067 126 : time_to_evict,
2068 126 : }
2069 126 : }
2070 : }
2071 :
2072 : impl LayerImplMetrics {
2073 78 : fn inc_started_evictions(&self) {
2074 78 : self.started_evictions.inc();
2075 78 : }
2076 54 : fn inc_completed_evictions(&self) {
2077 54 : self.completed_evictions.inc();
2078 54 : }
2079 24 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2080 24 : self.cancelled_evictions[reason].inc()
2081 24 : }
2082 :
2083 1398 : fn inc_started_deletes(&self) {
2084 1398 : self.started_deletes.inc();
2085 1398 : }
2086 1383 : fn inc_completed_deletes(&self) {
2087 1383 : self.completed_deletes.inc();
2088 1383 : }
2089 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2090 0 : self.failed_deletes[reason].inc();
2091 0 : }
2092 :
2093 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2094 : /// attempt regardless of failure to delete local file.
2095 0 : fn inc_delete_removes_failed(&self) {
2096 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2097 0 : }
2098 :
2099 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2100 : /// the single caller which can start the download, so use this counter to separte them.
2101 0 : fn inc_init_completed_without_requester(&self) {
2102 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2103 0 : }
2104 :
2105 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2106 0 : fn inc_download_failed_without_requester(&self) {
2107 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2108 0 : }
2109 :
2110 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2111 : ///
2112 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2113 : /// Option.
2114 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2115 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2116 0 : }
2117 :
2118 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2119 : /// running with remote storage.
2120 18 : fn inc_init_needed_no_download(&self) {
2121 18 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2122 18 : }
2123 :
2124 : /// Expected rare because all layer files should be readable and good
2125 0 : fn inc_permanent_loading_failures(&self) {
2126 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2127 0 : }
2128 :
2129 0 : fn inc_init_cancelled(&self) {
2130 0 : self.inits_cancelled.inc()
2131 0 : }
2132 :
2133 18 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2134 18 : self.redownload_after.observe(duration.as_secs_f64())
2135 18 : }
2136 :
2137 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2138 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2139 : /// from other evictions, so this could have noise as well.
2140 0 : fn inc_evicted_with_waiters(&self) {
2141 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2142 0 : }
2143 :
2144 : /// Recorded at least initially as the permit is now acquired in async context before
2145 : /// spawn_blocking action.
2146 54 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2147 54 : self.time_to_evict.observe(duration.as_secs_f64())
2148 54 : }
2149 : }
2150 :
2151 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2152 : enum EvictionCancelled {
2153 : LayerGone,
2154 : TimelineGone,
2155 : VersionCheckFailed,
2156 : FileNotFound,
2157 : RemoveFailed,
2158 : AlreadyReinitialized,
2159 : /// Not evicted because of a pending reinitialization
2160 : LostToDownload,
2161 : /// After eviction, there was a new layer access which cancelled the eviction.
2162 : UpgradedBackOnAccess,
2163 : UnexpectedEvictedState,
2164 : }
2165 :
2166 : impl EvictionCancelled {
2167 1134 : fn as_str(&self) -> &'static str {
2168 1134 : match self {
2169 126 : EvictionCancelled::LayerGone => "layer_gone",
2170 126 : EvictionCancelled::TimelineGone => "timeline_gone",
2171 126 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2172 126 : EvictionCancelled::FileNotFound => "file_not_found",
2173 126 : EvictionCancelled::RemoveFailed => "remove_failed",
2174 126 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2175 126 : EvictionCancelled::LostToDownload => "lost_to_download",
2176 126 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2177 126 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2178 : }
2179 1134 : }
2180 : }
2181 :
2182 : #[derive(enum_map::Enum)]
2183 : enum DeleteFailed {
2184 : TimelineGone,
2185 : DeleteSchedulingFailed,
2186 : }
2187 :
2188 : impl DeleteFailed {
2189 252 : fn as_str(&self) -> &'static str {
2190 252 : match self {
2191 126 : DeleteFailed::TimelineGone => "timeline_gone",
2192 126 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2193 : }
2194 252 : }
2195 : }
2196 :
2197 : #[derive(enum_map::Enum)]
2198 : enum RareEvent {
2199 : RemoveOnDropFailed,
2200 : InitCompletedWithoutRequester,
2201 : DownloadFailedWithoutRequester,
2202 : UpgradedWantedEvicted,
2203 : InitWithoutDownload,
2204 : PermanentLoadingFailure,
2205 : EvictedWithWaiters,
2206 : }
2207 :
2208 : impl RareEvent {
2209 882 : fn as_str(&self) -> &'static str {
2210 : use RareEvent::*;
2211 :
2212 882 : match self {
2213 126 : RemoveOnDropFailed => "remove_on_drop_failed",
2214 126 : InitCompletedWithoutRequester => "init_completed_without",
2215 126 : DownloadFailedWithoutRequester => "download_failed_without",
2216 126 : UpgradedWantedEvicted => "raced_wanted_evicted",
2217 126 : InitWithoutDownload => "init_needed_no_download",
2218 126 : PermanentLoadingFailure => "permanent_loading_failure",
2219 126 : EvictedWithWaiters => "evicted_with_waiters",
2220 : }
2221 882 : }
2222 : }
2223 :
2224 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2225 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|