Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext};
17 : use crate::repository::Key;
18 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
19 : use crate::task_mgr::TaskKind;
20 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
21 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
22 :
23 : use super::delta_layer::{self, DeltaEntry};
24 : use super::image_layer::{self};
25 : use super::{
26 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
27 : LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
28 : };
29 :
30 : use utils::generation::Generation;
31 :
32 : #[cfg(test)]
33 : mod tests;
34 :
35 : #[cfg(test)]
36 : mod failpoints;
37 :
38 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
39 : /// range of LSNs.
40 : ///
41 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
42 : /// layers are used to ingest incoming WAL, and provide fast access to the
43 : /// recent page versions. On-disk layers are stored as files on disk, and are
44 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
45 : /// [`InMemoryLayer`].
46 : ///
47 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
48 : /// A delta layer contains all modifications within a range of LSNs and keys.
49 : /// An image layer is a snapshot of all the data in a key-range, at a single
50 : /// LSN.
51 : ///
52 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
53 : /// general goal, read accesses should always win eviction and eviction should not wait for
54 : /// download.
55 : ///
56 : /// ### State transitions
57 : ///
58 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
59 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
60 : /// right size) or deleted.
61 : ///
62 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
63 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
64 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
65 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
66 : /// cancelling the eviction.
67 : ///
68 : /// ```text
69 : /// +-----------------+ get_or_maybe_download +--------------------------------+
70 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
71 : /// | ENOENT | /->| |
72 : /// +-----------------+ | +--------------------------------+
73 : /// ^ | | ^
74 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
75 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
76 : /// | | | | - re-initialize without download
77 : /// | | evict_and_wait | |
78 : /// +-----------------+ v |
79 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
80 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
81 : /// +-----------------+ +--------------------------------------+
82 : /// ```
83 : ///
84 : /// ### Unsupported
85 : ///
86 : /// - Evicting by the operator deleting files from the filesystem
87 : ///
88 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
89 : #[derive(Clone)]
90 : pub(crate) struct Layer(Arc<LayerInner>);
91 :
92 : impl std::fmt::Display for Layer {
93 1926 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 1926 : write!(
95 1926 : f,
96 1926 : "{}{}",
97 1926 : self.layer_desc().short_id(),
98 1926 : self.0.generation.get_suffix()
99 1926 : )
100 1926 : }
101 : }
102 :
103 : impl std::fmt::Debug for Layer {
104 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 0 : write!(f, "{}", self)
106 0 : }
107 : }
108 :
109 : impl AsLayerDesc for Layer {
110 502103 : fn layer_desc(&self) -> &PersistentLayerDesc {
111 502103 : self.0.layer_desc()
112 502103 : }
113 : }
114 :
115 : impl PartialEq for Layer {
116 3 : fn eq(&self, other: &Self) -> bool {
117 3 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
118 3 : }
119 : }
120 :
121 1690 : pub(crate) fn local_layer_path(
122 1690 : conf: &PageServerConf,
123 1690 : tenant_shard_id: &TenantShardId,
124 1690 : timeline_id: &TimelineId,
125 1690 : layer_file_name: &LayerName,
126 1690 : generation: &Generation,
127 1690 : ) -> Utf8PathBuf {
128 1690 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
129 1690 :
130 1690 : if generation.is_none() {
131 : // Without a generation, we may only use legacy path style
132 0 : timeline_path.join(layer_file_name.to_string())
133 : } else {
134 1690 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
135 : }
136 1690 : }
137 :
138 : impl Layer {
139 : /// Creates a layer value for a file we know to not be resident.
140 0 : pub(crate) fn for_evicted(
141 0 : conf: &'static PageServerConf,
142 0 : timeline: &Arc<Timeline>,
143 0 : file_name: LayerName,
144 0 : metadata: LayerFileMetadata,
145 0 : ) -> Self {
146 0 : let local_path = local_layer_path(
147 0 : conf,
148 0 : &timeline.tenant_shard_id,
149 0 : &timeline.timeline_id,
150 0 : &file_name,
151 0 : &metadata.generation,
152 0 : );
153 0 :
154 0 : let desc = PersistentLayerDesc::from_filename(
155 0 : timeline.tenant_shard_id,
156 0 : timeline.timeline_id,
157 0 : file_name,
158 0 : metadata.file_size,
159 0 : );
160 0 :
161 0 : let owner = Layer(Arc::new(LayerInner::new(
162 0 : conf,
163 0 : timeline,
164 0 : local_path,
165 0 : desc,
166 0 : None,
167 0 : metadata.generation,
168 0 : metadata.shard,
169 0 : )));
170 0 :
171 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
172 :
173 0 : owner
174 0 : }
175 :
176 : /// Creates a Layer value for a file we know to be resident in timeline directory.
177 24 : pub(crate) fn for_resident(
178 24 : conf: &'static PageServerConf,
179 24 : timeline: &Arc<Timeline>,
180 24 : local_path: Utf8PathBuf,
181 24 : file_name: LayerName,
182 24 : metadata: LayerFileMetadata,
183 24 : ) -> ResidentLayer {
184 24 : let desc = PersistentLayerDesc::from_filename(
185 24 : timeline.tenant_shard_id,
186 24 : timeline.timeline_id,
187 24 : file_name,
188 24 : metadata.file_size,
189 24 : );
190 24 :
191 24 : let mut resident = None;
192 24 :
193 24 : let owner = Layer(Arc::new_cyclic(|owner| {
194 24 : let inner = Arc::new(DownloadedLayer {
195 24 : owner: owner.clone(),
196 24 : kind: tokio::sync::OnceCell::default(),
197 24 : version: 0,
198 24 : });
199 24 : resident = Some(inner.clone());
200 24 :
201 24 : LayerInner::new(
202 24 : conf,
203 24 : timeline,
204 24 : local_path,
205 24 : desc,
206 24 : Some(inner),
207 24 : metadata.generation,
208 24 : metadata.shard,
209 24 : )
210 24 : }));
211 24 :
212 24 : let downloaded = resident.expect("just initialized");
213 24 :
214 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
215 :
216 24 : timeline
217 24 : .metrics
218 24 : .resident_physical_size_add(metadata.file_size);
219 24 :
220 24 : ResidentLayer { downloaded, owner }
221 24 : }
222 :
223 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
224 : /// temporary path.
225 1682 : pub(crate) fn finish_creating(
226 1682 : conf: &'static PageServerConf,
227 1682 : timeline: &Arc<Timeline>,
228 1682 : desc: PersistentLayerDesc,
229 1682 : temp_path: &Utf8Path,
230 1682 : ) -> anyhow::Result<ResidentLayer> {
231 1682 : let mut resident = None;
232 1682 :
233 1682 : let owner = Layer(Arc::new_cyclic(|owner| {
234 1682 : let inner = Arc::new(DownloadedLayer {
235 1682 : owner: owner.clone(),
236 1682 : kind: tokio::sync::OnceCell::default(),
237 1682 : version: 0,
238 1682 : });
239 1682 : resident = Some(inner.clone());
240 1682 :
241 1682 : let local_path = local_layer_path(
242 1682 : conf,
243 1682 : &timeline.tenant_shard_id,
244 1682 : &timeline.timeline_id,
245 1682 : &desc.layer_name(),
246 1682 : &timeline.generation,
247 1682 : );
248 1682 :
249 1682 : LayerInner::new(
250 1682 : conf,
251 1682 : timeline,
252 1682 : local_path,
253 1682 : desc,
254 1682 : Some(inner),
255 1682 : timeline.generation,
256 1682 : timeline.get_shard_index(),
257 1682 : )
258 1682 : }));
259 1682 :
260 1682 : let downloaded = resident.expect("just initialized");
261 1682 :
262 1682 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
263 1682 : // TODO: this leaves the temp file in place if the rename fails, risking us running
264 1682 : // out of space. Should we clean it up here or does the calling context deal with this?
265 1682 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
266 1682 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
267 :
268 1682 : Ok(ResidentLayer { downloaded, owner })
269 1682 : }
270 :
271 : /// Requests the layer to be evicted and waits for this to be done.
272 : ///
273 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
274 : ///
275 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
276 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
277 : ///
278 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
279 : /// will happen regardless the future returned by this method completing unless there is a
280 : /// read access before eviction gets to complete.
281 : ///
282 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
283 : /// of download-evict cycle on retry.
284 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
285 50 : self.0.evict_and_wait(timeout).await
286 32 : }
287 :
288 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
289 : /// then.
290 : ///
291 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
292 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
293 : /// the value this is called on gets dropped.
294 : ///
295 : /// This is ensured by both of those methods accepting references to Layer.
296 : ///
297 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
298 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
299 462 : pub(crate) fn delete_on_drop(&self) {
300 462 : self.0.delete_on_drop();
301 462 : }
302 :
303 212190 : pub(crate) async fn get_values_reconstruct_data(
304 212190 : &self,
305 212190 : keyspace: KeySpace,
306 212190 : lsn_range: Range<Lsn>,
307 212190 : reconstruct_data: &mut ValuesReconstructState,
308 212190 : ctx: &RequestContext,
309 212190 : ) -> Result<(), GetVectoredError> {
310 212190 : let layer = self
311 212190 : .0
312 212190 : .get_or_maybe_download(true, Some(ctx))
313 4 : .await
314 212190 : .map_err(|err| match err {
315 : DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
316 0 : GetVectoredError::Cancelled
317 : }
318 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
319 212190 : })?;
320 :
321 212190 : self.record_access(ctx);
322 212190 :
323 212190 : layer
324 212190 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
325 212190 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
326 98396 : .await
327 212190 : .map_err(|err| match err {
328 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
329 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
330 0 : ),
331 0 : err => err,
332 212190 : })
333 212190 : }
334 :
335 : /// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
336 : #[allow(dead_code)]
337 0 : pub(crate) async fn load_key_values(
338 0 : &self,
339 0 : ctx: &RequestContext,
340 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
341 0 : let layer = self
342 0 : .0
343 0 : .get_or_maybe_download(true, Some(ctx))
344 0 : .await
345 0 : .map_err(|err| match err {
346 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
347 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
348 0 : })?;
349 0 : layer.load_key_values(&self.0, ctx).await
350 0 : }
351 :
352 : /// Download the layer if evicted.
353 : ///
354 : /// Will not error when the layer is already downloaded.
355 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
356 0 : self.0.get_or_maybe_download(true, None).await?;
357 0 : Ok(())
358 0 : }
359 :
360 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
361 : /// while the guard exists.
362 : ///
363 : /// Returns None if the layer is currently evicted or becoming evicted.
364 : #[cfg(test)]
365 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
366 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
367 :
368 14 : Some(ResidentLayer {
369 14 : downloaded,
370 14 : owner: self.clone(),
371 14 : })
372 20 : }
373 :
374 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
375 : /// with `EvictionError::NotFound`.
376 : ///
377 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
378 : /// will be unless a read happens soon.
379 53 : pub(crate) fn is_likely_resident(&self) -> bool {
380 53 : self.0
381 53 : .inner
382 53 : .get()
383 53 : .map(|rowe| rowe.is_likely_resident())
384 53 : .unwrap_or(false)
385 53 : }
386 :
387 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
388 494 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
389 494 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
390 :
391 494 : Ok(ResidentLayer {
392 494 : downloaded,
393 494 : owner: self.clone(),
394 494 : })
395 494 : }
396 :
397 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
398 0 : self.0.info(reset)
399 0 : }
400 :
401 8 : pub(crate) fn latest_activity(&self) -> SystemTime {
402 8 : self.0.access_stats.latest_activity()
403 8 : }
404 :
405 10 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
406 10 : self.0.access_stats.visibility()
407 10 : }
408 :
409 1684 : pub(crate) fn local_path(&self) -> &Utf8Path {
410 1684 : &self.0.path
411 1684 : }
412 :
413 1958 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
414 1958 : self.0.metadata()
415 1958 : }
416 :
417 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
418 0 : self.0
419 0 : .timeline
420 0 : .upgrade()
421 0 : .map(|timeline| timeline.timeline_id)
422 0 : }
423 :
424 : /// Traditional debug dumping facility
425 : #[allow(unused)]
426 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
427 4 : self.0.desc.dump();
428 4 :
429 4 : if verbose {
430 : // for now, unconditionally download everything, even if that might not be wanted.
431 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
432 8 : l.dump(&self.0, ctx).await?
433 0 : }
434 :
435 4 : Ok(())
436 4 : }
437 :
438 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
439 : /// deletion scheduling has completed).
440 : ///
441 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
442 : /// separatedly.
443 : #[cfg(any(feature = "testing", test))]
444 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
445 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
446 :
447 2 : async move {
448 : loop {
449 6 : if rx.changed().await.is_err() {
450 2 : break;
451 0 : }
452 : }
453 2 : }
454 2 : }
455 :
456 212592 : fn record_access(&self, ctx: &RequestContext) {
457 212592 : if self.0.access_stats.record_access(ctx) {
458 : // Visibility was modified to Visible
459 0 : tracing::info!(
460 0 : "Layer {} became visible as a result of access",
461 0 : self.0.desc.key()
462 : );
463 0 : if let Some(tl) = self.0.timeline.upgrade() {
464 0 : tl.metrics
465 0 : .visible_physical_size_gauge
466 0 : .add(self.0.desc.file_size)
467 0 : }
468 212592 : }
469 212592 : }
470 :
471 2502 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
472 2502 : let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
473 2502 : use LayerVisibilityHint::*;
474 2502 : match (old_visibility, visibility) {
475 : (Visible, Covered) => {
476 : // Subtract this layer's contribution to the visible size metric
477 36 : if let Some(tl) = self.0.timeline.upgrade() {
478 36 : debug_assert!(
479 36 : tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
480 : );
481 36 : tl.metrics
482 36 : .visible_physical_size_gauge
483 36 : .sub(self.0.desc.file_size)
484 0 : }
485 : }
486 : (Covered, Visible) => {
487 : // Add this layer's contribution to the visible size metric
488 0 : if let Some(tl) = self.0.timeline.upgrade() {
489 0 : tl.metrics
490 0 : .visible_physical_size_gauge
491 0 : .add(self.0.desc.file_size)
492 0 : }
493 : }
494 2466 : (Covered, Covered) | (Visible, Visible) => {
495 2466 : // no change
496 2466 : }
497 : }
498 2502 : }
499 : }
500 :
501 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
502 : ///
503 : /// However when we want something evicted, we cannot evict it right away as there might be current
504 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
505 : /// read with [`Layer::get_values_reconstruct_data`].
506 : ///
507 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
508 : #[derive(Debug)]
509 : enum ResidentOrWantedEvicted {
510 : Resident(Arc<DownloadedLayer>),
511 : WantedEvicted(Weak<DownloadedLayer>, usize),
512 : }
513 :
514 : impl ResidentOrWantedEvicted {
515 : /// Non-mutating access to the a DownloadedLayer, if possible.
516 : ///
517 : /// This is not used on the read path (anything that calls
518 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
519 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
520 : #[cfg(test)]
521 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
522 14 : match self {
523 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
524 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
525 : }
526 14 : }
527 :
528 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
529 : /// reference from `ResidentOrWantedEvicted::get`.
530 47 : fn is_likely_resident(&self) -> bool {
531 47 : match self {
532 41 : ResidentOrWantedEvicted::Resident(_) => true,
533 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
534 : }
535 47 : }
536 :
537 : /// Upgrades any weak to strong if possible.
538 : ///
539 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
540 : /// happened.
541 212692 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
542 212692 : match self {
543 212684 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
544 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
545 0 : Some(strong) => {
546 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
547 0 :
548 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
549 0 :
550 0 : Some((strong, true))
551 : }
552 8 : None => None,
553 : },
554 : }
555 212692 : }
556 :
557 : /// When eviction is first requested, drop down to holding a [`Weak`].
558 : ///
559 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
560 : /// drop the possibly last strong reference outside of the mutex of
561 : /// [`heavier_once_cell::OnceCell`].
562 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
563 30 : match self {
564 26 : ResidentOrWantedEvicted::Resident(strong) => {
565 26 : let weak = Arc::downgrade(strong);
566 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
567 26 : std::mem::swap(self, &mut temp);
568 26 : match temp {
569 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
570 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
571 : }
572 : }
573 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
574 : }
575 30 : }
576 : }
577 :
578 : struct LayerInner {
579 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
580 : /// [`Self::path`].
581 : conf: &'static PageServerConf,
582 :
583 : /// Full path to the file; unclear if this should exist anymore.
584 : path: Utf8PathBuf,
585 :
586 : desc: PersistentLayerDesc,
587 :
588 : /// Timeline access is needed for remote timeline client and metrics.
589 : ///
590 : /// There should not be an access to timeline for any reason without entering the
591 : /// [`Timeline::gate`] at the same time.
592 : timeline: Weak<Timeline>,
593 :
594 : access_stats: LayerAccessStats,
595 :
596 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
597 : ///
598 : /// Filesystem changes (download, evict) are only done while holding a permit which the
599 : /// `heavier_once_cell` provides.
600 : ///
601 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
602 : /// possibly read while not holding it.
603 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
604 :
605 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
606 : wanted_deleted: AtomicBool,
607 :
608 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
609 : ///
610 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
611 : /// `ResidentOrWantedEvicted::WantedEvicted`.
612 : version: AtomicUsize,
613 :
614 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
615 : /// starts, or completes.
616 : ///
617 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
618 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
619 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
620 : /// [`ResidentOrWantedEvicted::Resident`] on access.
621 : ///
622 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
623 : status: Option<tokio::sync::watch::Sender<Status>>,
624 :
625 : /// Counter for exponential backoff with the download.
626 : ///
627 : /// This is atomic only for the purposes of having additional data only accessed while holding
628 : /// the InitPermit.
629 : consecutive_failures: AtomicUsize,
630 :
631 : /// The generation of this Layer.
632 : ///
633 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
634 : /// for created layers from [`Timeline::generation`].
635 : generation: Generation,
636 :
637 : /// The shard of this Layer.
638 : ///
639 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
640 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
641 : ///
642 : /// For loaded layers, this may be some other value if the tenant has undergone
643 : /// a shard split since the layer was originally written.
644 : shard: ShardIndex,
645 :
646 : /// When the Layer was last evicted but has not been downloaded since.
647 : ///
648 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
649 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
650 :
651 : #[cfg(test)]
652 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
653 : }
654 :
655 : impl std::fmt::Display for LayerInner {
656 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 30 : write!(f, "{}", self.layer_desc().short_id())
658 30 : }
659 : }
660 :
661 : impl AsLayerDesc for LayerInner {
662 504413 : fn layer_desc(&self) -> &PersistentLayerDesc {
663 504413 : &self.desc
664 504413 : }
665 : }
666 :
667 : #[derive(Debug, Clone, Copy)]
668 : enum Status {
669 : Resident,
670 : Evicted,
671 : Downloading,
672 : }
673 :
674 : impl Drop for LayerInner {
675 550 : fn drop(&mut self) {
676 : // if there was a pending eviction, mark it cancelled here to balance metrics
677 550 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
678 2 : {
679 2 : // eviction has already been started
680 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
681 2 :
682 2 : // eviction request is intentionally not honored as no one is present to wait for it
683 2 : // and we could be delaying shutdown for nothing.
684 548 : }
685 :
686 550 : if let Some(timeline) = self.timeline.upgrade() {
687 : // Only need to decrement metrics if the timeline still exists: otherwise
688 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
689 534 : if self.desc.is_delta() {
690 481 : timeline.metrics.layer_count_delta.dec();
691 481 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
692 481 : } else {
693 53 : timeline.metrics.layer_count_image.dec();
694 53 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
695 53 : }
696 :
697 534 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
698 528 : debug_assert!(
699 528 : timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
700 : );
701 528 : timeline
702 528 : .metrics
703 528 : .visible_physical_size_gauge
704 528 : .sub(self.desc.file_size);
705 6 : }
706 16 : }
707 :
708 550 : if !*self.wanted_deleted.get_mut() {
709 94 : return;
710 456 : }
711 :
712 456 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
713 :
714 456 : let path = std::mem::take(&mut self.path);
715 456 : let file_name = self.layer_desc().layer_name();
716 456 : let file_size = self.layer_desc().file_size;
717 456 : let timeline = self.timeline.clone();
718 456 : let meta = self.metadata();
719 456 : let status = self.status.take();
720 456 :
721 456 : Self::spawn_blocking(move || {
722 456 : let _g = span.entered();
723 456 :
724 456 : // carry this until we are finished for [`Layer::wait_drop`] support
725 456 : let _status = status;
726 :
727 456 : let Some(timeline) = timeline.upgrade() else {
728 : // no need to nag that timeline is gone: under normal situation on
729 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
730 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
731 0 : return;
732 : };
733 :
734 456 : let Ok(_guard) = timeline.gate.enter() else {
735 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
736 0 : return;
737 : };
738 :
739 456 : let removed = match std::fs::remove_file(path) {
740 454 : Ok(()) => true,
741 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
742 2 : // until we no longer do detaches by removing all local files before removing the
743 2 : // tenant from the global map, we will always get these errors even if we knew what
744 2 : // is the latest state.
745 2 : //
746 2 : // we currently do not track the latest state, so we'll also end up here on evicted
747 2 : // layers.
748 2 : false
749 : }
750 0 : Err(e) => {
751 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
752 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
753 0 : false
754 : }
755 : };
756 :
757 456 : if removed {
758 454 : timeline.metrics.resident_physical_size_sub(file_size);
759 454 : }
760 456 : let res = timeline
761 456 : .remote_client
762 456 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
763 :
764 456 : if let Err(e) = res {
765 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
766 : // demonstrating this deadlock (without spawn_blocking): stop will drop
767 : // queued items, which will have ResidentLayer's, and those drops would try
768 : // to re-entrantly lock the RemoteTimelineClient inner state.
769 0 : if !timeline.is_active() {
770 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
771 : } else {
772 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
773 : }
774 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
775 456 : } else {
776 456 : LAYER_IMPL_METRICS.inc_completed_deletes();
777 456 : }
778 456 : });
779 550 : }
780 : }
781 :
782 : impl LayerInner {
783 : #[allow(clippy::too_many_arguments)]
784 1706 : fn new(
785 1706 : conf: &'static PageServerConf,
786 1706 : timeline: &Arc<Timeline>,
787 1706 : local_path: Utf8PathBuf,
788 1706 : desc: PersistentLayerDesc,
789 1706 : downloaded: Option<Arc<DownloadedLayer>>,
790 1706 : generation: Generation,
791 1706 : shard: ShardIndex,
792 1706 : ) -> Self {
793 1706 : let (inner, version, init_status) = if let Some(inner) = downloaded {
794 1706 : let version = inner.version;
795 1706 : let resident = ResidentOrWantedEvicted::Resident(inner);
796 1706 : (
797 1706 : heavier_once_cell::OnceCell::new(resident),
798 1706 : version,
799 1706 : Status::Resident,
800 1706 : )
801 : } else {
802 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
803 : };
804 :
805 : // This object acts as a RAII guard on these metrics: increment on construction
806 1706 : if desc.is_delta() {
807 1418 : timeline.metrics.layer_count_delta.inc();
808 1418 : timeline.metrics.layer_size_delta.add(desc.file_size);
809 1418 : } else {
810 288 : timeline.metrics.layer_count_image.inc();
811 288 : timeline.metrics.layer_size_image.add(desc.file_size);
812 288 : }
813 :
814 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
815 1706 : timeline
816 1706 : .metrics
817 1706 : .visible_physical_size_gauge
818 1706 : .add(desc.file_size);
819 1706 :
820 1706 : LayerInner {
821 1706 : conf,
822 1706 : path: local_path,
823 1706 : desc,
824 1706 : timeline: Arc::downgrade(timeline),
825 1706 : access_stats: Default::default(),
826 1706 : wanted_deleted: AtomicBool::new(false),
827 1706 : inner,
828 1706 : version: AtomicUsize::new(version),
829 1706 : status: Some(tokio::sync::watch::channel(init_status).0),
830 1706 : consecutive_failures: AtomicUsize::new(0),
831 1706 : generation,
832 1706 : shard,
833 1706 : last_evicted_at: std::sync::Mutex::default(),
834 1706 : #[cfg(test)]
835 1706 : failpoints: Default::default(),
836 1706 : }
837 1706 : }
838 :
839 462 : fn delete_on_drop(&self) {
840 462 : let res =
841 462 : self.wanted_deleted
842 462 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
843 462 :
844 462 : if res.is_ok() {
845 458 : LAYER_IMPL_METRICS.inc_started_deletes();
846 458 : }
847 462 : }
848 :
849 : /// Cancellation safe, however dropping the future and calling this method again might result
850 : /// in a new attempt to evict OR join the previously started attempt.
851 108 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
852 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
853 : let mut rx = self.status.as_ref().unwrap().subscribe();
854 :
855 : {
856 : let current = rx.borrow_and_update();
857 : match &*current {
858 : Status::Resident => {
859 : // we might get lucky and evict this; continue
860 : }
861 : Status::Evicted | Status::Downloading => {
862 : // it is already evicted
863 : return Err(EvictionError::NotFound);
864 : }
865 : }
866 : }
867 :
868 : let strong = {
869 : match self.inner.get() {
870 : Some(mut either) => either.downgrade(),
871 : None => {
872 : // we already have a scheduled eviction, which just has not gotten to run yet.
873 : // it might still race with a read access, but that could also get cancelled,
874 : // so let's say this is not evictable.
875 : return Err(EvictionError::NotFound);
876 : }
877 : }
878 : };
879 :
880 : if strong.is_some() {
881 : // drop the DownloadedLayer outside of the holding the guard
882 : drop(strong);
883 :
884 : // idea here is that only one evicter should ever get to witness a strong reference,
885 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
886 : // cancelled eviction and signal us, like it currently does.
887 : //
888 : // a second concurrent evict_and_wait will not see a strong reference.
889 : LAYER_IMPL_METRICS.inc_started_evictions();
890 : }
891 :
892 : let changed = rx.changed();
893 : let changed = tokio::time::timeout(timeout, changed).await;
894 :
895 : let Ok(changed) = changed else {
896 : return Err(EvictionError::Timeout);
897 : };
898 :
899 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
900 :
901 : let current = rx.borrow_and_update();
902 :
903 : match &*current {
904 : // the easiest case
905 : Status::Evicted => Ok(()),
906 : // it surely was evicted in between, but then there was a new access now; we can't know
907 : // if it'll succeed so lets just call it evicted
908 : Status::Downloading => Ok(()),
909 : // either the download which was started after eviction completed already, or it was
910 : // never evicted
911 : Status::Resident => Err(EvictionError::Downloaded),
912 : }
913 : }
914 :
915 : /// Cancellation safe.
916 212700 : async fn get_or_maybe_download(
917 212700 : self: &Arc<Self>,
918 212700 : allow_download: bool,
919 212700 : ctx: Option<&RequestContext>,
920 212700 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
921 16 : let (weak, permit) = {
922 : // get_or_init_detached can:
923 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
924 : // - be slow (wait for semaphore permit or closing)
925 212700 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
926 :
927 212700 : let locked = self
928 212700 : .inner
929 212700 : .get_or_init_detached()
930 1 : .await
931 212700 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
932 212700 :
933 212700 : scopeguard::ScopeGuard::into_inner(init_cancelled);
934 :
935 212684 : match locked {
936 : // this path could had been a RwLock::read
937 212684 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
938 0 : Ok(Ok((strong, _))) => {
939 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
940 0 : // previously a `evict_and_wait` was received. this is the only place when we
941 0 : // send out an update without holding the InitPermit.
942 0 : //
943 0 : // note that we also have dropped the Guard; this is fine, because we just made
944 0 : // a state change and are holding a strong reference to be returned.
945 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
946 0 : LAYER_IMPL_METRICS
947 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
948 0 :
949 0 : return Ok(strong);
950 : }
951 8 : Ok(Err(guard)) => {
952 8 : // path to here: we won the eviction, the file should still be on the disk.
953 8 : let (weak, permit) = guard.take_and_deinit();
954 8 : (Some(weak), permit)
955 : }
956 8 : Err(permit) => (None, permit),
957 : }
958 : };
959 :
960 16 : if let Some(weak) = weak {
961 : // only drop the weak after dropping the heavier_once_cell guard
962 8 : assert!(
963 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
964 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
965 : );
966 8 : }
967 :
968 16 : let timeline = self
969 16 : .timeline
970 16 : .upgrade()
971 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
972 :
973 : // count cancellations, which currently remain largely unexpected
974 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
975 :
976 : // check if we really need to be downloaded: this can happen if a read access won the
977 : // semaphore before eviction.
978 : //
979 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
980 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
981 16 : let needs_download = self
982 16 : .needs_download()
983 15 : .await
984 16 : .map_err(DownloadError::PreStatFailed);
985 16 :
986 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
987 :
988 16 : let needs_download = needs_download?;
989 :
990 16 : let Some(reason) = needs_download else {
991 : // the file is present locally because eviction has not had a chance to run yet
992 :
993 : #[cfg(test)]
994 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
995 2 : .await?;
996 :
997 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
998 6 :
999 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
1000 : };
1001 :
1002 : // we must download; getting cancelled before spawning the download is not an issue as
1003 : // any still running eviction would not find anything to evict.
1004 :
1005 8 : if let NeedsDownload::NotFile(ft) = reason {
1006 0 : return Err(DownloadError::NotFile(ft));
1007 8 : }
1008 :
1009 8 : if let Some(ctx) = ctx {
1010 2 : self.check_expected_download(ctx)?;
1011 6 : }
1012 :
1013 8 : if !allow_download {
1014 : // this is only used from tests, but it is hard to test without the boolean
1015 2 : return Err(DownloadError::DownloadRequired);
1016 6 : }
1017 6 :
1018 6 : let download_ctx = ctx
1019 6 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1020 6 : .unwrap_or(RequestContext::new(
1021 6 : TaskKind::LayerDownload,
1022 6 : DownloadBehavior::Download,
1023 6 : ));
1024 :
1025 6 : async move {
1026 6 : tracing::info!(%reason, "downloading on-demand");
1027 :
1028 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1029 6 : let res = self
1030 6 : .download_init_and_wait(timeline, permit, download_ctx)
1031 10 : .await?;
1032 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1033 6 : Ok(res)
1034 6 : }
1035 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1036 10 : .await
1037 212700 : }
1038 :
1039 : /// Nag or fail per RequestContext policy
1040 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1041 2 : use crate::context::DownloadBehavior::*;
1042 2 : let b = ctx.download_behavior();
1043 2 : match b {
1044 2 : Download => Ok(()),
1045 : Warn | Error => {
1046 0 : tracing::info!(
1047 0 : "unexpectedly on-demand downloading for task kind {:?}",
1048 0 : ctx.task_kind()
1049 : );
1050 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1051 :
1052 0 : let really_error =
1053 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1054 :
1055 0 : if really_error {
1056 : // this check is only probablistic, seems like flakyness footgun
1057 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1058 : } else {
1059 0 : Ok(())
1060 : }
1061 : }
1062 : }
1063 2 : }
1064 :
1065 : /// Actual download, at most one is executed at the time.
1066 6 : async fn download_init_and_wait(
1067 6 : self: &Arc<Self>,
1068 6 : timeline: Arc<Timeline>,
1069 6 : permit: heavier_once_cell::InitPermit,
1070 6 : ctx: RequestContext,
1071 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1072 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1073 6 :
1074 6 : let (tx, rx) = tokio::sync::oneshot::channel();
1075 6 :
1076 6 : let this: Arc<Self> = self.clone();
1077 :
1078 6 : let guard = timeline
1079 6 : .gate
1080 6 : .enter()
1081 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
1082 :
1083 6 : Self::spawn(
1084 6 : async move {
1085 0 : let _guard = guard;
1086 0 :
1087 0 : // now that we have commited to downloading, send out an update to:
1088 0 : // - unhang any pending eviction
1089 0 : // - break out of evict_and_wait
1090 0 : this.status
1091 0 : .as_ref()
1092 0 : .unwrap()
1093 0 : .send_replace(Status::Downloading);
1094 6 :
1095 6 : #[cfg(test)]
1096 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1097 2 : .await
1098 6 : .unwrap();
1099 :
1100 99 : let res = this.download_and_init(timeline, permit, &ctx).await;
1101 :
1102 6 : if let Err(res) = tx.send(res) {
1103 0 : match res {
1104 0 : Ok(_res) => {
1105 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1106 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1107 : }
1108 0 : Err(e) => {
1109 0 : tracing::info!(
1110 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1111 : );
1112 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1113 : }
1114 : }
1115 6 : }
1116 6 : }
1117 6 : .in_current_span(),
1118 6 : );
1119 6 :
1120 10 : match rx.await {
1121 6 : Ok(Ok(res)) => Ok(res),
1122 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1123 0 : Err(DownloadError::DownloadCancelled)
1124 : }
1125 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1126 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1127 : }
1128 6 : }
1129 :
1130 6 : async fn download_and_init(
1131 6 : self: &Arc<LayerInner>,
1132 6 : timeline: Arc<Timeline>,
1133 6 : permit: heavier_once_cell::InitPermit,
1134 6 : ctx: &RequestContext,
1135 6 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1136 6 : let result = timeline
1137 6 : .remote_client
1138 6 : .download_layer_file(
1139 6 : &self.desc.layer_name(),
1140 6 : &self.metadata(),
1141 6 : &self.path,
1142 6 : &timeline.cancel,
1143 6 : ctx,
1144 6 : )
1145 93 : .await;
1146 :
1147 6 : match result {
1148 6 : Ok(size) => {
1149 6 : assert_eq!(size, self.desc.file_size);
1150 :
1151 6 : match self.needs_download().await {
1152 0 : Ok(Some(reason)) => {
1153 0 : // this is really a bug in needs_download or remote timeline client
1154 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1155 : }
1156 6 : Ok(None) => {
1157 6 : // as expected
1158 6 : }
1159 0 : Err(e) => {
1160 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1161 : }
1162 : }
1163 :
1164 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1165 6 : timeline
1166 6 : .metrics
1167 6 : .resident_physical_size_add(self.desc.file_size);
1168 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1169 6 :
1170 6 : let since_last_eviction = self
1171 6 : .last_evicted_at
1172 6 : .lock()
1173 6 : .unwrap()
1174 6 : .take()
1175 6 : .map(|ts| ts.elapsed());
1176 6 : if let Some(since_last_eviction) = since_last_eviction {
1177 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1178 6 : }
1179 :
1180 6 : self.access_stats.record_residence_event();
1181 6 :
1182 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1183 : }
1184 0 : Err(e) => {
1185 0 : let consecutive_failures =
1186 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1187 0 :
1188 0 : if timeline.cancel.is_cancelled() {
1189 : // If we're shutting down, drop out before logging the error
1190 0 : return Err(e);
1191 0 : }
1192 0 :
1193 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1194 :
1195 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1196 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1197 0 : 1.5,
1198 0 : 60.0,
1199 0 : );
1200 0 :
1201 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1202 :
1203 : tokio::select! {
1204 : _ = tokio::time::sleep(backoff) => {},
1205 : _ = timeline.cancel.cancelled() => {},
1206 : };
1207 :
1208 0 : Err(e)
1209 : }
1210 : }
1211 6 : }
1212 :
1213 : /// Initializes the `Self::inner` to a "resident" state.
1214 : ///
1215 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1216 : /// before calling this method.
1217 : ///
1218 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1219 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1220 12 : fn initialize_after_layer_is_on_disk(
1221 12 : self: &Arc<LayerInner>,
1222 12 : permit: heavier_once_cell::InitPermit,
1223 12 : ) -> Arc<DownloadedLayer> {
1224 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1225 12 :
1226 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1227 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1228 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1229 12 :
1230 12 : let res = Arc::new(DownloadedLayer {
1231 12 : owner: Arc::downgrade(self),
1232 12 : kind: tokio::sync::OnceCell::default(),
1233 12 : version: next_version,
1234 12 : });
1235 12 :
1236 12 : let waiters = self.inner.initializer_count();
1237 12 : if waiters > 0 {
1238 0 : tracing::info!(waiters, "completing layer init for other tasks");
1239 12 : }
1240 :
1241 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1242 12 :
1243 12 : self.inner.set(value, permit);
1244 12 :
1245 12 : res
1246 12 : }
1247 :
1248 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1249 24 : match tokio::fs::metadata(&self.path).await {
1250 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1251 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1252 0 : Err(e) => Err(e),
1253 : }
1254 24 : }
1255 :
1256 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1257 24 : match self.path.metadata() {
1258 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1259 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1260 0 : Err(e) => Err(e),
1261 : }
1262 24 : }
1263 :
1264 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1265 40 : // in future, this should include sha2-256 validation of the file.
1266 40 : if !m.is_file() {
1267 0 : Err(NeedsDownload::NotFile(m.file_type()))
1268 40 : } else if m.len() != self.desc.file_size {
1269 0 : Err(NeedsDownload::WrongSize {
1270 0 : actual: m.len(),
1271 0 : expected: self.desc.file_size,
1272 0 : })
1273 : } else {
1274 40 : Ok(())
1275 : }
1276 40 : }
1277 :
1278 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1279 0 : let layer_name = self.desc.layer_name().to_string();
1280 0 :
1281 0 : let resident = self
1282 0 : .inner
1283 0 : .get()
1284 0 : .map(|rowe| rowe.is_likely_resident())
1285 0 : .unwrap_or(false);
1286 0 :
1287 0 : let access_stats = self.access_stats.as_api_model(reset);
1288 0 :
1289 0 : if self.desc.is_delta {
1290 0 : let lsn_range = &self.desc.lsn_range;
1291 0 :
1292 0 : HistoricLayerInfo::Delta {
1293 0 : layer_file_name: layer_name,
1294 0 : layer_file_size: self.desc.file_size,
1295 0 : lsn_start: lsn_range.start,
1296 0 : lsn_end: lsn_range.end,
1297 0 : remote: !resident,
1298 0 : access_stats,
1299 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(&self.layer_desc().key_range),
1300 0 : }
1301 : } else {
1302 0 : let lsn = self.desc.image_layer_lsn();
1303 0 :
1304 0 : HistoricLayerInfo::Image {
1305 0 : layer_file_name: layer_name,
1306 0 : layer_file_size: self.desc.file_size,
1307 0 : lsn_start: lsn,
1308 0 : remote: !resident,
1309 0 : access_stats,
1310 0 : }
1311 : }
1312 0 : }
1313 :
1314 : /// `DownloadedLayer` is being dropped, so it calls this method.
1315 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1316 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1317 : // though here it is very likely
1318 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1319 :
1320 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1321 : // drop while the `self.inner` is being locked, leading to a deadlock.
1322 :
1323 24 : let start_evicting = async move {
1324 24 : #[cfg(test)]
1325 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1326 14 : .await
1327 24 : .expect("failpoint should not have errored");
1328 24 :
1329 24 : tracing::debug!("eviction started");
1330 :
1331 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1332 : // metrics: ignore the Ok branch, it is not done yet
1333 24 : if let Err(e) = res {
1334 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1335 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1336 18 : }
1337 24 : };
1338 :
1339 24 : Self::spawn(start_evicting.instrument(span));
1340 24 : }
1341 :
1342 24 : async fn wait_for_turn_and_evict(
1343 24 : self: Arc<LayerInner>,
1344 24 : only_version: usize,
1345 24 : ) -> Result<(), EvictionCancelled> {
1346 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1347 46 : use Status::*;
1348 46 : match status {
1349 44 : Resident => Ok(()),
1350 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1351 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1352 : }
1353 46 : }
1354 :
1355 24 : let timeline = self
1356 24 : .timeline
1357 24 : .upgrade()
1358 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1359 :
1360 24 : let mut rx = self
1361 24 : .status
1362 24 : .as_ref()
1363 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1364 24 : .subscribe();
1365 24 :
1366 24 : is_good_to_continue(&rx.borrow_and_update())?;
1367 :
1368 22 : let Ok(gate) = timeline.gate.enter() else {
1369 0 : return Err(EvictionCancelled::TimelineGone);
1370 : };
1371 :
1372 18 : let permit = {
1373 : // we cannot just `std::fs::remove_file` because there might already be an
1374 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1375 : // operations must be done while holding the heavier_once_cell::InitPermit
1376 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1377 :
1378 22 : let waited = loop {
1379 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1380 22 : // completion of the download. waiting for download could be long and hinder our
1381 22 : // efforts to alert on "hanging" evictions.
1382 22 : tokio::select! {
1383 : res = &mut wait => break res,
1384 : _ = rx.changed() => {
1385 : is_good_to_continue(&rx.borrow_and_update())?;
1386 : // two possibilities for Status::Resident:
1387 : // - the layer was found locally from disk by a read
1388 : // - we missed a bunch of updates and now the layer is
1389 : // again downloaded -- assume we'll fail later on with
1390 : // version check or AlreadyReinitialized
1391 : }
1392 22 : }
1393 22 : };
1394 :
1395 : // re-check now that we have the guard or permit; all updates should have happened
1396 : // while holding the permit.
1397 22 : is_good_to_continue(&rx.borrow_and_update())?;
1398 :
1399 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1400 : // lead to deallocating the reference counted value, and the value we
1401 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1402 22 : let (_weak, permit) = match waited {
1403 20 : Ok(guard) => {
1404 20 : match &*guard {
1405 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1406 18 : if *version == only_version =>
1407 16 : {
1408 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1409 16 : let (weak, permit) = guard.take_and_deinit();
1410 16 : (Some(weak), permit)
1411 : }
1412 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1413 2 : // if we were not doing the version check, we would need to try to
1414 2 : // upgrade the weak here to see if it really is dropped. version check
1415 2 : // is done instead assuming that it is cheaper.
1416 2 : tracing::debug!(
1417 : version,
1418 : only_version,
1419 0 : "version mismatch, not deinitializing"
1420 : );
1421 2 : return Err(EvictionCancelled::VersionCheckFailed);
1422 : }
1423 : ResidentOrWantedEvicted::Resident(_) => {
1424 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1425 : }
1426 : }
1427 : }
1428 2 : Err(permit) => {
1429 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1430 2 : (None, permit)
1431 : }
1432 : };
1433 :
1434 18 : permit
1435 18 : };
1436 18 :
1437 18 : let span = tracing::Span::current();
1438 18 :
1439 18 : let spawned_at = std::time::Instant::now();
1440 18 :
1441 18 : // this is on purpose a detached spawn; we don't need to wait for it
1442 18 : //
1443 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1444 18 : // well from a spawn_blocking thread.
1445 18 : //
1446 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1447 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1448 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1449 18 : // evicting.
1450 18 : //
1451 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1452 18 : // reads. We will need to add cancellation for that if necessary.
1453 18 : Self::spawn_blocking(move || {
1454 18 : let _span = span.entered();
1455 18 :
1456 18 : let res = self.evict_blocking(&timeline, &gate, &permit);
1457 18 :
1458 18 : let waiters = self.inner.initializer_count();
1459 18 :
1460 18 : if waiters > 0 {
1461 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1462 18 : }
1463 :
1464 18 : let completed_in = spawned_at.elapsed();
1465 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1466 18 :
1467 18 : match res {
1468 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1469 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1470 : }
1471 :
1472 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1473 18 : });
1474 18 :
1475 18 : Ok(())
1476 24 : }
1477 :
1478 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1479 18 : fn evict_blocking(
1480 18 : &self,
1481 18 : timeline: &Timeline,
1482 18 : _gate: &gate::GateGuard,
1483 18 : _permit: &heavier_once_cell::InitPermit,
1484 18 : ) -> Result<(), EvictionCancelled> {
1485 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1486 18 :
1487 18 : match capture_mtime_and_remove(&self.path) {
1488 18 : Ok(local_layer_mtime) => {
1489 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1490 18 : match duration {
1491 18 : Ok(elapsed) => {
1492 18 : let accessed = self.access_stats.accessed();
1493 18 : if accessed {
1494 4 : // Only layers used for reads contribute to our "low residence" metric that is used
1495 4 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1496 4 : // to be rapidly evicted without contributing to this metric.
1497 4 : timeline
1498 4 : .metrics
1499 4 : .evictions_with_low_residence_duration
1500 4 : .read()
1501 4 : .unwrap()
1502 4 : .observe(elapsed);
1503 14 : }
1504 :
1505 18 : tracing::info!(
1506 0 : residence_millis = elapsed.as_millis(),
1507 0 : accessed,
1508 0 : "evicted layer after known residence period"
1509 : );
1510 : }
1511 : Err(_) => {
1512 0 : tracing::info!("evicted layer after unknown residence period");
1513 : }
1514 : }
1515 18 : timeline.metrics.evictions.inc();
1516 18 : timeline
1517 18 : .metrics
1518 18 : .resident_physical_size_sub(self.desc.file_size);
1519 : }
1520 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1521 0 : tracing::error!(
1522 : layer_size = %self.desc.file_size,
1523 0 : "failed to evict layer from disk, it was already gone"
1524 : );
1525 0 : return Err(EvictionCancelled::FileNotFound);
1526 : }
1527 0 : Err(e) => {
1528 0 : // FIXME: this should probably be an abort
1529 0 : tracing::error!("failed to evict file from disk: {e:#}");
1530 0 : return Err(EvictionCancelled::RemoveFailed);
1531 : }
1532 : }
1533 :
1534 18 : self.access_stats.record_residence_event();
1535 18 :
1536 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1537 18 :
1538 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1539 18 :
1540 18 : Ok(())
1541 18 : }
1542 :
1543 2420 : fn metadata(&self) -> LayerFileMetadata {
1544 2420 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1545 2420 : }
1546 :
1547 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1548 : ///
1549 : /// Synchronizing with spawned tasks is very complicated otherwise.
1550 30 : fn spawn<F>(fut: F)
1551 30 : where
1552 30 : F: std::future::Future<Output = ()> + Send + 'static,
1553 30 : {
1554 30 : #[cfg(test)]
1555 30 : tokio::task::spawn(fut);
1556 30 : #[cfg(not(test))]
1557 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1558 30 : }
1559 :
1560 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1561 474 : fn spawn_blocking<F>(f: F)
1562 474 : where
1563 474 : F: FnOnce() + Send + 'static,
1564 474 : {
1565 474 : #[cfg(test)]
1566 474 : tokio::task::spawn_blocking(f);
1567 474 : #[cfg(not(test))]
1568 474 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1569 474 : }
1570 : }
1571 :
1572 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1573 18 : let m = path.metadata()?;
1574 18 : let local_layer_mtime = m.modified()?;
1575 18 : std::fs::remove_file(path)?;
1576 18 : Ok(local_layer_mtime)
1577 18 : }
1578 :
1579 0 : #[derive(Debug, thiserror::Error)]
1580 : pub(crate) enum EvictionError {
1581 : #[error("layer was already evicted")]
1582 : NotFound,
1583 :
1584 : /// Evictions must always lose to downloads in races, and this time it happened.
1585 : #[error("layer was downloaded instead")]
1586 : Downloaded,
1587 :
1588 : #[error("eviction did not happen within timeout")]
1589 : Timeout,
1590 : }
1591 :
1592 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1593 0 : #[derive(Debug, thiserror::Error)]
1594 : pub(crate) enum DownloadError {
1595 : #[error("timeline has already shutdown")]
1596 : TimelineShutdown,
1597 : #[error("context denies downloading")]
1598 : ContextAndConfigReallyDeniesDownloads,
1599 : #[error("downloading is really required but not allowed by this method")]
1600 : DownloadRequired,
1601 : #[error("layer path exists, but it is not a file: {0:?}")]
1602 : NotFile(std::fs::FileType),
1603 : /// Why no error here? Because it will be reported by page_service. We should had also done
1604 : /// retries already.
1605 : #[error("downloading evicted layer file failed")]
1606 : DownloadFailed,
1607 : #[error("downloading failed, possibly for shutdown")]
1608 : DownloadCancelled,
1609 : #[error("pre-condition: stat before download failed")]
1610 : PreStatFailed(#[source] std::io::Error),
1611 :
1612 : #[cfg(test)]
1613 : #[error("failpoint: {0:?}")]
1614 : Failpoint(failpoints::FailpointKind),
1615 : }
1616 :
1617 : impl DownloadError {
1618 0 : pub(crate) fn is_cancelled(&self) -> bool {
1619 0 : matches!(self, DownloadError::DownloadCancelled)
1620 0 : }
1621 : }
1622 :
1623 : #[derive(Debug, PartialEq)]
1624 : pub(crate) enum NeedsDownload {
1625 : NotFound,
1626 : NotFile(std::fs::FileType),
1627 : WrongSize { actual: u64, expected: u64 },
1628 : }
1629 :
1630 : impl std::fmt::Display for NeedsDownload {
1631 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1632 6 : match self {
1633 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1634 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1635 0 : NeedsDownload::WrongSize { actual, expected } => {
1636 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1637 : }
1638 : }
1639 6 : }
1640 : }
1641 :
1642 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1643 : pub(crate) struct DownloadedLayer {
1644 : owner: Weak<LayerInner>,
1645 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1646 : // DownloadedLayer
1647 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1648 : version: usize,
1649 : }
1650 :
1651 : impl std::fmt::Debug for DownloadedLayer {
1652 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1653 0 : f.debug_struct("DownloadedLayer")
1654 0 : // owner omitted because it is always "Weak"
1655 0 : .field("kind", &self.kind)
1656 0 : .field("version", &self.version)
1657 0 : .finish()
1658 0 : }
1659 : }
1660 :
1661 : impl Drop for DownloadedLayer {
1662 572 : fn drop(&mut self) {
1663 572 : if let Some(owner) = self.owner.upgrade() {
1664 24 : owner.on_downloaded_layer_drop(self.version);
1665 548 : } else {
1666 548 : // Layer::drop will handle cancelling the eviction; because of drop order and
1667 548 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1668 548 : }
1669 572 : }
1670 : }
1671 :
1672 : impl DownloadedLayer {
1673 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1674 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1675 : /// the initial load failure immediately.
1676 : ///
1677 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1678 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1679 : /// we will always have the LayerInner on the callstack, so we can just use it.
1680 213158 : async fn get<'a>(
1681 213158 : &'a self,
1682 213158 : owner: &Arc<LayerInner>,
1683 213158 : ctx: &RequestContext,
1684 213158 : ) -> anyhow::Result<&'a LayerKind> {
1685 213158 : let init = || async {
1686 1148 : assert_eq!(
1687 1148 : Weak::as_ptr(&self.owner),
1688 1148 : Arc::as_ptr(owner),
1689 1148 : "these are the same, just avoiding the upgrade"
1690 1148 : );
1691 1148 :
1692 1148 : let res = if owner.desc.is_delta {
1693 1148 : let summary = Some(delta_layer::Summary::expected(
1694 1050 : owner.desc.tenant_shard_id.tenant_id,
1695 1050 : owner.desc.timeline_id,
1696 1050 : owner.desc.key_range.clone(),
1697 1050 : owner.desc.lsn_range.clone(),
1698 1050 : ));
1699 1050 : delta_layer::DeltaLayerInner::load(
1700 1050 : &owner.path,
1701 1050 : summary,
1702 1050 : Some(owner.conf.max_vectored_read_bytes),
1703 1050 : ctx,
1704 1050 : )
1705 1148 : .await
1706 1148 : .map(LayerKind::Delta)
1707 1148 : } else {
1708 1148 : let lsn = owner.desc.image_layer_lsn();
1709 98 : let summary = Some(image_layer::Summary::expected(
1710 98 : owner.desc.tenant_shard_id.tenant_id,
1711 98 : owner.desc.timeline_id,
1712 98 : owner.desc.key_range.clone(),
1713 98 : lsn,
1714 98 : ));
1715 98 : image_layer::ImageLayerInner::load(
1716 98 : &owner.path,
1717 98 : lsn,
1718 98 : summary,
1719 98 : Some(owner.conf.max_vectored_read_bytes),
1720 98 : ctx,
1721 98 : )
1722 1148 : .await
1723 1148 : .map(LayerKind::Image)
1724 1148 : };
1725 1148 :
1726 1148 : match res {
1727 1148 : Ok(layer) => Ok(layer),
1728 1148 : Err(err) => {
1729 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1730 0 : // We log this message once over the lifetime of `Self`
1731 0 : // => Ok and good to log backtrace and path here.
1732 0 : tracing::error!(
1733 1148 : "layer load failed, assuming permanent failure: {}: {err:?}",
1734 0 : owner.path
1735 1148 : );
1736 1148 : Err(err)
1737 1148 : }
1738 1148 : }
1739 1148 : };
1740 213158 : self.kind
1741 213158 : .get_or_init(init)
1742 1154 : .await
1743 213158 : .as_ref()
1744 213158 : // We already logged the full backtrace above, once. Don't repeat that here.
1745 213158 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1746 213158 : }
1747 :
1748 212190 : async fn get_values_reconstruct_data(
1749 212190 : &self,
1750 212190 : keyspace: KeySpace,
1751 212190 : lsn_range: Range<Lsn>,
1752 212190 : reconstruct_data: &mut ValuesReconstructState,
1753 212190 : owner: &Arc<LayerInner>,
1754 212190 : ctx: &RequestContext,
1755 212190 : ) -> Result<(), GetVectoredError> {
1756 212190 : use LayerKind::*;
1757 212190 :
1758 212190 : match self
1759 212190 : .get(owner, ctx)
1760 864 : .await
1761 212190 : .map_err(GetVectoredError::Other)?
1762 : {
1763 204452 : Delta(d) => {
1764 204452 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1765 92063 : .await
1766 : }
1767 7738 : Image(i) => {
1768 7738 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1769 5469 : .await
1770 : }
1771 : }
1772 212190 : }
1773 :
1774 0 : async fn load_key_values(
1775 0 : &self,
1776 0 : owner: &Arc<LayerInner>,
1777 0 : ctx: &RequestContext,
1778 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
1779 0 : use LayerKind::*;
1780 0 :
1781 0 : match self.get(owner, ctx).await? {
1782 0 : Delta(d) => d.load_key_values(ctx).await,
1783 0 : Image(i) => i.load_key_values(ctx).await,
1784 : }
1785 0 : }
1786 :
1787 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1788 4 : use LayerKind::*;
1789 4 : match self.get(owner, ctx).await? {
1790 4 : Delta(d) => d.dump(ctx).await?,
1791 0 : Image(i) => i.dump(ctx).await?,
1792 : }
1793 :
1794 4 : Ok(())
1795 4 : }
1796 : }
1797 :
1798 : /// Wrapper around an actual layer implementation.
1799 : #[derive(Debug)]
1800 : enum LayerKind {
1801 : Delta(delta_layer::DeltaLayerInner),
1802 : Image(image_layer::ImageLayerInner),
1803 : }
1804 :
1805 : /// Guard for forcing a layer be resident while it exists.
1806 : #[derive(Clone)]
1807 : pub(crate) struct ResidentLayer {
1808 : owner: Layer,
1809 : downloaded: Arc<DownloadedLayer>,
1810 : }
1811 :
1812 : impl std::fmt::Display for ResidentLayer {
1813 1926 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1814 1926 : write!(f, "{}", self.owner)
1815 1926 : }
1816 : }
1817 :
1818 : impl std::fmt::Debug for ResidentLayer {
1819 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1820 0 : write!(f, "{}", self.owner)
1821 0 : }
1822 : }
1823 :
1824 : impl ResidentLayer {
1825 : /// Release the eviction guard, converting back into a plain [`Layer`].
1826 : ///
1827 : /// You can access the [`Layer`] also by using `as_ref`.
1828 420 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1829 420 : self.into()
1830 420 : }
1831 :
1832 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1833 804 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1834 : pub(crate) async fn load_keys<'a>(
1835 : &'a self,
1836 : ctx: &RequestContext,
1837 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1838 : use LayerKind::*;
1839 :
1840 : let owner = &self.owner.0;
1841 : match self.downloaded.get(owner, ctx).await? {
1842 : Delta(ref d) => {
1843 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1844 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1845 : // while it's being held.
1846 : self.owner.record_access(ctx);
1847 :
1848 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1849 : .await
1850 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1851 : }
1852 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1853 : }
1854 : }
1855 :
1856 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1857 : /// the provided writer. Return the number of keys written.
1858 16 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1859 : pub(crate) async fn filter(
1860 : &self,
1861 : shard_identity: &ShardIdentity,
1862 : writer: &mut ImageLayerWriter,
1863 : ctx: &RequestContext,
1864 : ) -> Result<usize, CompactionError> {
1865 : use LayerKind::*;
1866 :
1867 : match self
1868 : .downloaded
1869 : .get(&self.owner.0, ctx)
1870 : .await
1871 : .map_err(CompactionError::Other)?
1872 : {
1873 : Delta(_) => {
1874 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1875 : "cannot filter() on a delta layer {self}"
1876 : ))));
1877 : }
1878 : Image(i) => i
1879 : .filter(shard_identity, writer, ctx)
1880 : .await
1881 : .map_err(CompactionError::Other),
1882 : }
1883 : }
1884 :
1885 : /// Returns the amount of keys and values written to the writer.
1886 10 : pub(crate) async fn copy_delta_prefix(
1887 10 : &self,
1888 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1889 10 : until: Lsn,
1890 10 : ctx: &RequestContext,
1891 10 : ) -> anyhow::Result<usize> {
1892 10 : use LayerKind::*;
1893 10 :
1894 10 : let owner = &self.owner.0;
1895 10 :
1896 10 : match self.downloaded.get(owner, ctx).await? {
1897 10 : Delta(ref d) => d
1898 10 : .copy_prefix(writer, until, ctx)
1899 13 : .await
1900 10 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1901 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1902 : }
1903 10 : }
1904 :
1905 1657 : pub(crate) fn local_path(&self) -> &Utf8Path {
1906 1657 : &self.owner.0.path
1907 1657 : }
1908 :
1909 1544 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1910 1544 : self.owner.metadata()
1911 1544 : }
1912 :
1913 : /// Cast the layer to a delta, return an error if it is an image layer.
1914 516 : pub(crate) async fn get_as_delta(
1915 516 : &self,
1916 516 : ctx: &RequestContext,
1917 516 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1918 516 : use LayerKind::*;
1919 516 : match self.downloaded.get(&self.owner.0, ctx).await? {
1920 516 : Delta(ref d) => Ok(d),
1921 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1922 : }
1923 516 : }
1924 :
1925 : /// Cast the layer to an image, return an error if it is a delta layer.
1926 28 : pub(crate) async fn get_as_image(
1927 28 : &self,
1928 28 : ctx: &RequestContext,
1929 28 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1930 28 : use LayerKind::*;
1931 28 : match self.downloaded.get(&self.owner.0, ctx).await? {
1932 28 : Image(ref d) => Ok(d),
1933 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1934 : }
1935 28 : }
1936 : }
1937 :
1938 : impl AsLayerDesc for ResidentLayer {
1939 5543 : fn layer_desc(&self) -> &PersistentLayerDesc {
1940 5543 : self.owner.layer_desc()
1941 5543 : }
1942 : }
1943 :
1944 : impl AsRef<Layer> for ResidentLayer {
1945 1920 : fn as_ref(&self) -> &Layer {
1946 1920 : &self.owner
1947 1920 : }
1948 : }
1949 :
1950 : /// Drop the eviction guard.
1951 : impl From<ResidentLayer> for Layer {
1952 420 : fn from(value: ResidentLayer) -> Self {
1953 420 : value.owner
1954 420 : }
1955 : }
1956 :
1957 : use metrics::IntCounter;
1958 :
1959 : pub(crate) struct LayerImplMetrics {
1960 : started_evictions: IntCounter,
1961 : completed_evictions: IntCounter,
1962 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1963 :
1964 : started_deletes: IntCounter,
1965 : completed_deletes: IntCounter,
1966 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1967 :
1968 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1969 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1970 : redownload_after: metrics::Histogram,
1971 : time_to_evict: metrics::Histogram,
1972 : }
1973 :
1974 : impl Default for LayerImplMetrics {
1975 40 : fn default() -> Self {
1976 40 : use enum_map::Enum;
1977 40 :
1978 40 : // reminder: these will be pageserver_layer_* with "_total" suffix
1979 40 :
1980 40 : let started_evictions = metrics::register_int_counter!(
1981 : "pageserver_layer_started_evictions",
1982 : "Evictions started in the Layer implementation"
1983 : )
1984 40 : .unwrap();
1985 40 : let completed_evictions = metrics::register_int_counter!(
1986 : "pageserver_layer_completed_evictions",
1987 : "Evictions completed in the Layer implementation"
1988 : )
1989 40 : .unwrap();
1990 40 :
1991 40 : let cancelled_evictions = metrics::register_int_counter_vec!(
1992 : "pageserver_layer_cancelled_evictions_count",
1993 : "Different reasons for evictions to have been cancelled or failed",
1994 : &["reason"]
1995 : )
1996 40 : .unwrap();
1997 40 :
1998 360 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1999 360 : let reason = EvictionCancelled::from_usize(i);
2000 360 : let s = reason.as_str();
2001 360 : cancelled_evictions.with_label_values(&[s])
2002 360 : }));
2003 40 :
2004 40 : let started_deletes = metrics::register_int_counter!(
2005 : "pageserver_layer_started_deletes",
2006 : "Deletions on drop pending in the Layer implementation"
2007 : )
2008 40 : .unwrap();
2009 40 : let completed_deletes = metrics::register_int_counter!(
2010 : "pageserver_layer_completed_deletes",
2011 : "Deletions on drop completed in the Layer implementation"
2012 : )
2013 40 : .unwrap();
2014 40 :
2015 40 : let failed_deletes = metrics::register_int_counter_vec!(
2016 : "pageserver_layer_failed_deletes_count",
2017 : "Different reasons for deletions on drop to have failed",
2018 : &["reason"]
2019 : )
2020 40 : .unwrap();
2021 40 :
2022 80 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2023 80 : let reason = DeleteFailed::from_usize(i);
2024 80 : let s = reason.as_str();
2025 80 : failed_deletes.with_label_values(&[s])
2026 80 : }));
2027 40 :
2028 40 : let rare_counters = metrics::register_int_counter_vec!(
2029 : "pageserver_layer_assumed_rare_count",
2030 : "Times unexpected or assumed rare event happened",
2031 : &["event"]
2032 : )
2033 40 : .unwrap();
2034 40 :
2035 280 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2036 280 : let event = RareEvent::from_usize(i);
2037 280 : let s = event.as_str();
2038 280 : rare_counters.with_label_values(&[s])
2039 280 : }));
2040 40 :
2041 40 : let inits_cancelled = metrics::register_int_counter!(
2042 : "pageserver_layer_inits_cancelled_count",
2043 : "Times Layer initialization was cancelled",
2044 : )
2045 40 : .unwrap();
2046 40 :
2047 40 : let redownload_after = {
2048 40 : let minute = 60.0;
2049 40 : let hour = 60.0 * minute;
2050 : metrics::register_histogram!(
2051 : "pageserver_layer_redownloaded_after",
2052 : "Time between evicting and re-downloading.",
2053 : vec![
2054 : 10.0,
2055 : 30.0,
2056 : minute,
2057 : 5.0 * minute,
2058 : 15.0 * minute,
2059 : 30.0 * minute,
2060 : hour,
2061 : 12.0 * hour,
2062 : ]
2063 : )
2064 40 : .unwrap()
2065 40 : };
2066 40 :
2067 40 : let time_to_evict = metrics::register_histogram!(
2068 : "pageserver_layer_eviction_held_permit_seconds",
2069 : "Time eviction held the permit.",
2070 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2071 : )
2072 40 : .unwrap();
2073 40 :
2074 40 : Self {
2075 40 : started_evictions,
2076 40 : completed_evictions,
2077 40 : cancelled_evictions,
2078 40 :
2079 40 : started_deletes,
2080 40 : completed_deletes,
2081 40 : failed_deletes,
2082 40 :
2083 40 : rare_counters,
2084 40 : inits_cancelled,
2085 40 : redownload_after,
2086 40 : time_to_evict,
2087 40 : }
2088 40 : }
2089 : }
2090 :
2091 : impl LayerImplMetrics {
2092 26 : fn inc_started_evictions(&self) {
2093 26 : self.started_evictions.inc();
2094 26 : }
2095 18 : fn inc_completed_evictions(&self) {
2096 18 : self.completed_evictions.inc();
2097 18 : }
2098 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2099 8 : self.cancelled_evictions[reason].inc()
2100 8 : }
2101 :
2102 458 : fn inc_started_deletes(&self) {
2103 458 : self.started_deletes.inc();
2104 458 : }
2105 456 : fn inc_completed_deletes(&self) {
2106 456 : self.completed_deletes.inc();
2107 456 : }
2108 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2109 0 : self.failed_deletes[reason].inc();
2110 0 : }
2111 :
2112 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2113 : /// attempt regardless of failure to delete local file.
2114 0 : fn inc_delete_removes_failed(&self) {
2115 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2116 0 : }
2117 :
2118 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2119 : /// the single caller which can start the download, so use this counter to separte them.
2120 0 : fn inc_init_completed_without_requester(&self) {
2121 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2122 0 : }
2123 :
2124 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2125 0 : fn inc_download_failed_without_requester(&self) {
2126 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2127 0 : }
2128 :
2129 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2130 : ///
2131 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2132 : /// Option.
2133 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2134 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2135 0 : }
2136 :
2137 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2138 : /// running with remote storage.
2139 6 : fn inc_init_needed_no_download(&self) {
2140 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2141 6 : }
2142 :
2143 : /// Expected rare because all layer files should be readable and good
2144 0 : fn inc_permanent_loading_failures(&self) {
2145 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2146 0 : }
2147 :
2148 0 : fn inc_init_cancelled(&self) {
2149 0 : self.inits_cancelled.inc()
2150 0 : }
2151 :
2152 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2153 6 : self.redownload_after.observe(duration.as_secs_f64())
2154 6 : }
2155 :
2156 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2157 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2158 : /// from other evictions, so this could have noise as well.
2159 0 : fn inc_evicted_with_waiters(&self) {
2160 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2161 0 : }
2162 :
2163 : /// Recorded at least initially as the permit is now acquired in async context before
2164 : /// spawn_blocking action.
2165 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2166 18 : self.time_to_evict.observe(duration.as_secs_f64())
2167 18 : }
2168 : }
2169 :
2170 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2171 : enum EvictionCancelled {
2172 : LayerGone,
2173 : TimelineGone,
2174 : VersionCheckFailed,
2175 : FileNotFound,
2176 : RemoveFailed,
2177 : AlreadyReinitialized,
2178 : /// Not evicted because of a pending reinitialization
2179 : LostToDownload,
2180 : /// After eviction, there was a new layer access which cancelled the eviction.
2181 : UpgradedBackOnAccess,
2182 : UnexpectedEvictedState,
2183 : }
2184 :
2185 : impl EvictionCancelled {
2186 360 : fn as_str(&self) -> &'static str {
2187 360 : match self {
2188 40 : EvictionCancelled::LayerGone => "layer_gone",
2189 40 : EvictionCancelled::TimelineGone => "timeline_gone",
2190 40 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2191 40 : EvictionCancelled::FileNotFound => "file_not_found",
2192 40 : EvictionCancelled::RemoveFailed => "remove_failed",
2193 40 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2194 40 : EvictionCancelled::LostToDownload => "lost_to_download",
2195 40 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2196 40 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2197 : }
2198 360 : }
2199 : }
2200 :
2201 : #[derive(enum_map::Enum)]
2202 : enum DeleteFailed {
2203 : TimelineGone,
2204 : DeleteSchedulingFailed,
2205 : }
2206 :
2207 : impl DeleteFailed {
2208 80 : fn as_str(&self) -> &'static str {
2209 80 : match self {
2210 40 : DeleteFailed::TimelineGone => "timeline_gone",
2211 40 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2212 : }
2213 80 : }
2214 : }
2215 :
2216 : #[derive(enum_map::Enum)]
2217 : enum RareEvent {
2218 : RemoveOnDropFailed,
2219 : InitCompletedWithoutRequester,
2220 : DownloadFailedWithoutRequester,
2221 : UpgradedWantedEvicted,
2222 : InitWithoutDownload,
2223 : PermanentLoadingFailure,
2224 : EvictedWithWaiters,
2225 : }
2226 :
2227 : impl RareEvent {
2228 280 : fn as_str(&self) -> &'static str {
2229 280 : use RareEvent::*;
2230 280 :
2231 280 : match self {
2232 40 : RemoveOnDropFailed => "remove_on_drop_failed",
2233 40 : InitCompletedWithoutRequester => "init_completed_without",
2234 40 : DownloadFailedWithoutRequester => "download_failed_without",
2235 40 : UpgradedWantedEvicted => "raced_wanted_evicted",
2236 40 : InitWithoutDownload => "init_needed_no_download",
2237 40 : PermanentLoadingFailure => "permanent_loading_failure",
2238 40 : EvictedWithWaiters => "evicted_with_waiters",
2239 : }
2240 280 : }
2241 : }
2242 :
2243 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2244 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|