Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::{
5 : HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
6 : };
7 : use pageserver_api::shard::{ShardIndex, TenantShardId};
8 : use std::ops::Range;
9 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10 : use std::sync::{Arc, Weak};
11 : use std::time::{Duration, SystemTime};
12 : use tracing::Instrument;
13 : use utils::id::TimelineId;
14 : use utils::lsn::Lsn;
15 : use utils::sync::heavier_once_cell;
16 :
17 : use crate::config::PageServerConf;
18 : use crate::context::{DownloadBehavior, RequestContext};
19 : use crate::repository::Key;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::task_mgr::TaskKind;
22 : use crate::tenant::timeline::GetVectoredError;
23 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
24 :
25 : use super::delta_layer::{self, DeltaEntry};
26 : use super::image_layer;
27 : use super::{
28 : AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
29 : ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
30 : };
31 :
32 : use utils::generation::Generation;
33 :
34 : #[cfg(test)]
35 : mod tests;
36 :
37 : #[cfg(test)]
38 : mod failpoints;
39 :
40 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
41 : /// range of LSNs.
42 : ///
43 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
44 : /// layers are used to ingest incoming WAL, and provide fast access to the
45 : /// recent page versions. On-disk layers are stored as files on disk, and are
46 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
47 : /// [`InMemoryLayer`].
48 : ///
49 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
50 : /// A delta layer contains all modifications within a range of LSNs and keys.
51 : /// An image layer is a snapshot of all the data in a key-range, at a single
52 : /// LSN.
53 : ///
54 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
55 : /// general goal, read accesses should always win eviction and eviction should not wait for
56 : /// download.
57 : ///
58 : /// ### State transitions
59 : ///
60 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
61 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
62 : /// right size) or deleted.
63 : ///
64 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
65 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
66 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
67 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
68 : /// cancelling the eviction.
69 : ///
70 : /// ```text
71 : /// +-----------------+ get_or_maybe_download +--------------------------------+
72 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
73 : /// | ENOENT | /->| |
74 : /// +-----------------+ | +--------------------------------+
75 : /// ^ | | ^
76 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
77 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
78 : /// | | | | - re-initialize without download
79 : /// | | evict_and_wait | |
80 : /// +-----------------+ v |
81 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
82 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
83 : /// +-----------------+ +--------------------------------------+
84 : /// ```
85 : ///
86 : /// ### Unsupported
87 : ///
88 : /// - Evicting by the operator deleting files from the filesystem
89 : ///
90 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
91 : #[derive(Clone)]
92 : pub(crate) struct Layer(Arc<LayerInner>);
93 :
94 : impl std::fmt::Display for Layer {
95 1436 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 1436 : if matches!(self.0.generation, Generation::Broken) {
97 0 : write!(f, "{}-broken", self.layer_desc().short_id())
98 : } else {
99 1436 : write!(
100 1436 : f,
101 1436 : "{}{}",
102 1436 : self.layer_desc().short_id(),
103 1436 : self.0.generation.get_suffix()
104 1436 : )
105 : }
106 1436 : }
107 : }
108 :
109 : impl std::fmt::Debug for Layer {
110 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 0 : write!(f, "{}", self)
112 0 : }
113 : }
114 :
115 : impl AsLayerDesc for Layer {
116 701952 : fn layer_desc(&self) -> &PersistentLayerDesc {
117 701952 : self.0.layer_desc()
118 701952 : }
119 : }
120 :
121 : impl PartialEq for Layer {
122 4 : fn eq(&self, other: &Self) -> bool {
123 4 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
124 4 : }
125 : }
126 :
127 1192 : pub(crate) fn local_layer_path(
128 1192 : conf: &PageServerConf,
129 1192 : tenant_shard_id: &TenantShardId,
130 1192 : timeline_id: &TimelineId,
131 1192 : layer_file_name: &LayerName,
132 1192 : _generation: &Generation,
133 1192 : ) -> Utf8PathBuf {
134 1192 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
135 1192 :
136 1192 : timeline_path.join(layer_file_name.to_string())
137 1192 :
138 1192 : // TODO: switch to enabling new-style layer paths after next release
139 1192 : // if generation.is_none() {
140 1192 : // // Without a generation, we may only use legacy path style
141 1192 : // timeline_path.join(layer_file_name.to_string())
142 1192 : // } else {
143 1192 : // timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
144 1192 : // }
145 1192 : }
146 :
147 : impl Layer {
148 : /// Creates a layer value for a file we know to not be resident.
149 0 : pub(crate) fn for_evicted(
150 0 : conf: &'static PageServerConf,
151 0 : timeline: &Arc<Timeline>,
152 0 : file_name: LayerName,
153 0 : metadata: LayerFileMetadata,
154 0 : ) -> Self {
155 0 : let local_path = local_layer_path(
156 0 : conf,
157 0 : &timeline.tenant_shard_id,
158 0 : &timeline.timeline_id,
159 0 : &file_name,
160 0 : &metadata.generation,
161 0 : );
162 0 :
163 0 : let desc = PersistentLayerDesc::from_filename(
164 0 : timeline.tenant_shard_id,
165 0 : timeline.timeline_id,
166 0 : file_name,
167 0 : metadata.file_size(),
168 0 : );
169 0 :
170 0 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
171 0 :
172 0 : let owner = Layer(Arc::new(LayerInner::new(
173 0 : conf,
174 0 : timeline,
175 0 : local_path,
176 0 : access_stats,
177 0 : desc,
178 0 : None,
179 0 : metadata.generation,
180 0 : metadata.shard,
181 0 : )));
182 0 :
183 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
184 :
185 0 : owner
186 0 : }
187 :
188 : /// Creates a Layer value for a file we know to be resident in timeline directory.
189 24 : pub(crate) fn for_resident(
190 24 : conf: &'static PageServerConf,
191 24 : timeline: &Arc<Timeline>,
192 24 : local_path: Utf8PathBuf,
193 24 : file_name: LayerName,
194 24 : metadata: LayerFileMetadata,
195 24 : ) -> ResidentLayer {
196 24 : let desc = PersistentLayerDesc::from_filename(
197 24 : timeline.tenant_shard_id,
198 24 : timeline.timeline_id,
199 24 : file_name,
200 24 : metadata.file_size(),
201 24 : );
202 24 :
203 24 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
204 24 :
205 24 : let mut resident = None;
206 24 :
207 24 : let owner = Layer(Arc::new_cyclic(|owner| {
208 24 : let inner = Arc::new(DownloadedLayer {
209 24 : owner: owner.clone(),
210 24 : kind: tokio::sync::OnceCell::default(),
211 24 : version: 0,
212 24 : });
213 24 : resident = Some(inner.clone());
214 24 :
215 24 : LayerInner::new(
216 24 : conf,
217 24 : timeline,
218 24 : local_path,
219 24 : access_stats,
220 24 : desc,
221 24 : Some(inner),
222 24 : metadata.generation,
223 24 : metadata.shard,
224 24 : )
225 24 : }));
226 24 :
227 24 : let downloaded = resident.expect("just initialized");
228 24 :
229 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
230 :
231 24 : timeline
232 24 : .metrics
233 24 : .resident_physical_size_add(metadata.file_size());
234 24 :
235 24 : ResidentLayer { downloaded, owner }
236 24 : }
237 :
238 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
239 : /// temporary path.
240 1178 : pub(crate) fn finish_creating(
241 1178 : conf: &'static PageServerConf,
242 1178 : timeline: &Arc<Timeline>,
243 1178 : desc: PersistentLayerDesc,
244 1178 : temp_path: &Utf8Path,
245 1178 : ) -> anyhow::Result<ResidentLayer> {
246 1178 : let mut resident = None;
247 1178 :
248 1178 : let owner = Layer(Arc::new_cyclic(|owner| {
249 1178 : let inner = Arc::new(DownloadedLayer {
250 1178 : owner: owner.clone(),
251 1178 : kind: tokio::sync::OnceCell::default(),
252 1178 : version: 0,
253 1178 : });
254 1178 : resident = Some(inner.clone());
255 1178 : let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
256 1178 : access_stats.record_residence_event(
257 1178 : LayerResidenceStatus::Resident,
258 1178 : LayerResidenceEventReason::LayerCreate,
259 1178 : );
260 1178 :
261 1178 : let local_path = local_layer_path(
262 1178 : conf,
263 1178 : &timeline.tenant_shard_id,
264 1178 : &timeline.timeline_id,
265 1178 : &desc.layer_name(),
266 1178 : &timeline.generation,
267 1178 : );
268 1178 :
269 1178 : LayerInner::new(
270 1178 : conf,
271 1178 : timeline,
272 1178 : local_path,
273 1178 : access_stats,
274 1178 : desc,
275 1178 : Some(inner),
276 1178 : timeline.generation,
277 1178 : timeline.get_shard_index(),
278 1178 : )
279 1178 : }));
280 1178 :
281 1178 : let downloaded = resident.expect("just initialized");
282 1178 :
283 1178 : // if the rename works, the path is as expected
284 1178 : // TODO: sync system call
285 1178 : std::fs::rename(temp_path, owner.local_path())
286 1178 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
287 :
288 1178 : Ok(ResidentLayer { downloaded, owner })
289 1178 : }
290 :
291 : /// Requests the layer to be evicted and waits for this to be done.
292 : ///
293 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
294 : ///
295 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
296 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
297 : ///
298 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
299 : /// will happen regardless the future returned by this method completing unless there is a
300 : /// read access before eviction gets to complete.
301 : ///
302 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
303 : /// of download-evict cycle on retry.
304 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
305 49 : self.0.evict_and_wait(timeout).await
306 32 : }
307 :
308 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
309 : /// then.
310 : ///
311 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
312 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
313 : /// the value this is called on gets dropped.
314 : ///
315 : /// This is ensured by both of those methods accepting references to Layer.
316 : ///
317 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
318 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
319 274 : pub(crate) fn delete_on_drop(&self) {
320 274 : self.0.delete_on_drop();
321 274 : }
322 :
323 : /// Return data needed to reconstruct given page at LSN.
324 : ///
325 : /// It is up to the caller to collect more data from the previous layer and
326 : /// perform WAL redo, if necessary.
327 : ///
328 : /// # Cancellation-Safety
329 : ///
330 : /// This method is cancellation-safe.
331 209308 : pub(crate) async fn get_value_reconstruct_data(
332 209308 : &self,
333 209308 : key: Key,
334 209308 : lsn_range: Range<Lsn>,
335 209308 : reconstruct_data: &mut ValueReconstructState,
336 209308 : ctx: &RequestContext,
337 209308 : ) -> anyhow::Result<ValueReconstructResult> {
338 : use anyhow::ensure;
339 :
340 209308 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
341 209308 : self.0
342 209308 : .access_stats
343 209308 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
344 209308 :
345 209308 : if self.layer_desc().is_delta {
346 208782 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
347 208782 : ensure!(self.layer_desc().key_range.contains(&key));
348 : } else {
349 526 : ensure!(self.layer_desc().key_range.contains(&key));
350 526 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
351 526 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
352 : }
353 :
354 209308 : layer
355 209308 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
356 209308 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
357 29497 : .await
358 209308 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
359 209308 : }
360 :
361 150 : pub(crate) async fn get_values_reconstruct_data(
362 150 : &self,
363 150 : keyspace: KeySpace,
364 150 : lsn_range: Range<Lsn>,
365 150 : reconstruct_data: &mut ValuesReconstructState,
366 150 : ctx: &RequestContext,
367 150 : ) -> Result<(), GetVectoredError> {
368 150 : let layer = self
369 150 : .0
370 150 : .get_or_maybe_download(true, Some(ctx))
371 0 : .await
372 150 : .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
373 :
374 150 : self.0
375 150 : .access_stats
376 150 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
377 150 :
378 150 : layer
379 150 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
380 150 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
381 7385 : .await
382 150 : .map_err(|err| match err {
383 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
384 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
385 0 : ),
386 0 : err => err,
387 150 : })
388 150 : }
389 :
390 : /// Download the layer if evicted.
391 : ///
392 : /// Will not error when the layer is already downloaded.
393 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
394 0 : self.0.get_or_maybe_download(true, None).await?;
395 0 : Ok(())
396 0 : }
397 :
398 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
399 : /// while the guard exists.
400 : ///
401 : /// Returns None if the layer is currently evicted or becoming evicted.
402 : #[cfg(test)]
403 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
404 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
405 :
406 14 : Some(ResidentLayer {
407 14 : downloaded,
408 14 : owner: self.clone(),
409 14 : })
410 20 : }
411 :
412 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
413 : /// with `EvictionError::NotFound`.
414 : ///
415 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
416 : /// will be unless a read happens soon.
417 44 : pub(crate) fn is_likely_resident(&self) -> bool {
418 44 : self.0
419 44 : .inner
420 44 : .get()
421 44 : .map(|rowe| rowe.is_likely_resident())
422 44 : .unwrap_or(false)
423 44 : }
424 :
425 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
426 274 : pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
427 274 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
428 :
429 274 : Ok(ResidentLayer {
430 274 : downloaded,
431 274 : owner: self.clone(),
432 274 : })
433 274 : }
434 :
435 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
436 0 : self.0.info(reset)
437 0 : }
438 :
439 0 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
440 0 : &self.0.access_stats
441 0 : }
442 :
443 1180 : pub(crate) fn local_path(&self) -> &Utf8Path {
444 1180 : &self.0.path
445 1180 : }
446 :
447 209304 : pub(crate) fn debug_str(&self) -> &Arc<str> {
448 209304 : &self.0.debug_str
449 209304 : }
450 :
451 1174 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
452 1174 : self.0.metadata()
453 1174 : }
454 :
455 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
456 0 : self.0
457 0 : .timeline
458 0 : .upgrade()
459 0 : .map(|timeline| timeline.timeline_id)
460 0 : }
461 :
462 : /// Traditional debug dumping facility
463 : #[allow(unused)]
464 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
465 4 : self.0.desc.dump();
466 4 :
467 4 : if verbose {
468 : // for now, unconditionally download everything, even if that might not be wanted.
469 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
470 8 : l.dump(&self.0, ctx).await?
471 0 : }
472 :
473 4 : Ok(())
474 4 : }
475 :
476 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
477 : /// deletion scheduling has completed).
478 : ///
479 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
480 : /// separatedly.
481 : #[cfg(any(feature = "testing", test))]
482 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
483 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
484 :
485 2 : async move {
486 : loop {
487 6 : if rx.changed().await.is_err() {
488 2 : break;
489 0 : }
490 : }
491 2 : }
492 2 : }
493 : }
494 :
495 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
496 : ///
497 : /// However when we want something evicted, we cannot evict it right away as there might be current
498 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
499 : /// read with [`Layer::get_value_reconstruct_data`].
500 : ///
501 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
502 : #[derive(Debug)]
503 : enum ResidentOrWantedEvicted {
504 : Resident(Arc<DownloadedLayer>),
505 : WantedEvicted(Weak<DownloadedLayer>, usize),
506 : }
507 :
508 : impl ResidentOrWantedEvicted {
509 : /// Non-mutating access to the a DownloadedLayer, if possible.
510 : ///
511 : /// This is not used on the read path (anything that calls
512 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
513 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
514 : #[cfg(test)]
515 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
516 14 : match self {
517 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
518 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
519 : }
520 14 : }
521 :
522 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
523 : /// reference from `ResidentOrWantedEvicted::get`.
524 38 : fn is_likely_resident(&self) -> bool {
525 38 : match self {
526 32 : ResidentOrWantedEvicted::Resident(_) => true,
527 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
528 : }
529 38 : }
530 :
531 : /// Upgrades any weak to strong if possible.
532 : ///
533 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
534 : /// happened.
535 209740 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
536 209740 : match self {
537 209732 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
538 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
539 0 : Some(strong) => {
540 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
541 0 :
542 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
543 0 :
544 0 : Some((strong, true))
545 : }
546 8 : None => None,
547 : },
548 : }
549 209740 : }
550 :
551 : /// When eviction is first requested, drop down to holding a [`Weak`].
552 : ///
553 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
554 : /// drop the possibly last strong reference outside of the mutex of
555 : /// [`heavier_once_cell::OnceCell`].
556 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
557 30 : match self {
558 26 : ResidentOrWantedEvicted::Resident(strong) => {
559 26 : let weak = Arc::downgrade(strong);
560 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
561 26 : std::mem::swap(self, &mut temp);
562 26 : match temp {
563 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
564 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
565 : }
566 : }
567 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
568 : }
569 30 : }
570 : }
571 :
572 : struct LayerInner {
573 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
574 : /// [`Self::path`].
575 : conf: &'static PageServerConf,
576 :
577 : /// Full path to the file; unclear if this should exist anymore.
578 : path: Utf8PathBuf,
579 :
580 : /// String representation of the layer, used for traversal id.
581 : debug_str: Arc<str>,
582 :
583 : desc: PersistentLayerDesc,
584 :
585 : /// Timeline access is needed for remote timeline client and metrics.
586 : ///
587 : /// There should not be an access to timeline for any reason without entering the
588 : /// [`Timeline::gate`] at the same time.
589 : timeline: Weak<Timeline>,
590 :
591 : /// Cached knowledge of [`Timeline::remote_client`] being `Some`.
592 : have_remote_client: bool,
593 :
594 : access_stats: LayerAccessStats,
595 :
596 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
597 : ///
598 : /// Filesystem changes (download, evict) are only done while holding a permit which the
599 : /// `heavier_once_cell` provides.
600 : ///
601 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
602 : /// possibly read while not holding it.
603 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
604 :
605 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
606 : wanted_deleted: AtomicBool,
607 :
608 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
609 : ///
610 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
611 : /// `ResidentOrWantedEvicted::WantedEvicted`.
612 : version: AtomicUsize,
613 :
614 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
615 : /// starts, or completes.
616 : ///
617 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
618 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
619 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
620 : /// [`ResidentOrWantedEvicted::Resident`] on access.
621 : ///
622 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
623 : status: Option<tokio::sync::watch::Sender<Status>>,
624 :
625 : /// Counter for exponential backoff with the download.
626 : ///
627 : /// This is atomic only for the purposes of having additional data only accessed while holding
628 : /// the InitPermit.
629 : consecutive_failures: AtomicUsize,
630 :
631 : /// The generation of this Layer.
632 : ///
633 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
634 : /// for created layers from [`Timeline::generation`].
635 : generation: Generation,
636 :
637 : /// The shard of this Layer.
638 : ///
639 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
640 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
641 : ///
642 : /// For loaded layers, this may be some other value if the tenant has undergone
643 : /// a shard split since the layer was originally written.
644 : shard: ShardIndex,
645 :
646 : /// When the Layer was last evicted but has not been downloaded since.
647 : ///
648 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
649 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
650 :
651 : #[cfg(test)]
652 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
653 : }
654 :
655 : impl std::fmt::Display for LayerInner {
656 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 30 : write!(f, "{}", self.layer_desc().short_id())
658 30 : }
659 : }
660 :
661 : impl AsLayerDesc for LayerInner {
662 703332 : fn layer_desc(&self) -> &PersistentLayerDesc {
663 703332 : &self.desc
664 703332 : }
665 : }
666 :
667 : #[derive(Debug, Clone, Copy)]
668 : enum Status {
669 : Resident,
670 : Evicted,
671 : Downloading,
672 : }
673 :
674 : impl Drop for LayerInner {
675 306 : fn drop(&mut self) {
676 : // if there was a pending eviction, mark it cancelled here to balance metrics
677 306 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
678 2 : {
679 2 : // eviction has already been started
680 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
681 2 :
682 2 : // eviction request is intentionally not honored as no one is present to wait for it
683 2 : // and we could be delaying shutdown for nothing.
684 304 : }
685 :
686 306 : if !*self.wanted_deleted.get_mut() {
687 36 : return;
688 270 : }
689 :
690 270 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
691 :
692 270 : let path = std::mem::take(&mut self.path);
693 270 : let file_name = self.layer_desc().layer_name();
694 270 : let file_size = self.layer_desc().file_size;
695 270 : let timeline = self.timeline.clone();
696 270 : let meta = self.metadata();
697 270 : let status = self.status.take();
698 270 :
699 270 : Self::spawn_blocking(move || {
700 270 : let _g = span.entered();
701 270 :
702 270 : // carry this until we are finished for [`Layer::wait_drop`] support
703 270 : let _status = status;
704 :
705 270 : let Some(timeline) = timeline.upgrade() else {
706 : // no need to nag that timeline is gone: under normal situation on
707 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
708 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
709 0 : return;
710 : };
711 :
712 270 : let Ok(_guard) = timeline.gate.enter() else {
713 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
714 0 : return;
715 : };
716 :
717 270 : let removed = match std::fs::remove_file(path) {
718 268 : Ok(()) => true,
719 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
720 2 : // until we no longer do detaches by removing all local files before removing the
721 2 : // tenant from the global map, we will always get these errors even if we knew what
722 2 : // is the latest state.
723 2 : //
724 2 : // we currently do not track the latest state, so we'll also end up here on evicted
725 2 : // layers.
726 2 : false
727 : }
728 0 : Err(e) => {
729 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
730 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
731 0 : false
732 : }
733 : };
734 :
735 270 : if removed {
736 268 : timeline.metrics.resident_physical_size_sub(file_size);
737 268 : }
738 270 : if let Some(remote_client) = timeline.remote_client.as_ref() {
739 270 : let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
740 :
741 270 : if let Err(e) = res {
742 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
743 : // demonstrating this deadlock (without spawn_blocking): stop will drop
744 : // queued items, which will have ResidentLayer's, and those drops would try
745 : // to re-entrantly lock the RemoteTimelineClient inner state.
746 0 : if !timeline.is_active() {
747 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
748 : } else {
749 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
750 : }
751 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
752 270 : } else {
753 270 : LAYER_IMPL_METRICS.inc_completed_deletes();
754 270 : }
755 0 : }
756 270 : });
757 306 : }
758 : }
759 :
760 : impl LayerInner {
761 : #[allow(clippy::too_many_arguments)]
762 1202 : fn new(
763 1202 : conf: &'static PageServerConf,
764 1202 : timeline: &Arc<Timeline>,
765 1202 : local_path: Utf8PathBuf,
766 1202 : access_stats: LayerAccessStats,
767 1202 : desc: PersistentLayerDesc,
768 1202 : downloaded: Option<Arc<DownloadedLayer>>,
769 1202 : generation: Generation,
770 1202 : shard: ShardIndex,
771 1202 : ) -> Self {
772 1202 : let (inner, version, init_status) = if let Some(inner) = downloaded {
773 1202 : let version = inner.version;
774 1202 : let resident = ResidentOrWantedEvicted::Resident(inner);
775 1202 : (
776 1202 : heavier_once_cell::OnceCell::new(resident),
777 1202 : version,
778 1202 : Status::Resident,
779 1202 : )
780 : } else {
781 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
782 : };
783 :
784 1202 : LayerInner {
785 1202 : conf,
786 1202 : debug_str: {
787 1202 : format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
788 1202 : },
789 1202 : path: local_path,
790 1202 : desc,
791 1202 : timeline: Arc::downgrade(timeline),
792 1202 : have_remote_client: timeline.remote_client.is_some(),
793 1202 : access_stats,
794 1202 : wanted_deleted: AtomicBool::new(false),
795 1202 : inner,
796 1202 : version: AtomicUsize::new(version),
797 1202 : status: Some(tokio::sync::watch::channel(init_status).0),
798 1202 : consecutive_failures: AtomicUsize::new(0),
799 1202 : generation,
800 1202 : shard,
801 1202 : last_evicted_at: std::sync::Mutex::default(),
802 1202 : #[cfg(test)]
803 1202 : failpoints: Default::default(),
804 1202 : }
805 1202 : }
806 :
807 274 : fn delete_on_drop(&self) {
808 274 : let res =
809 274 : self.wanted_deleted
810 274 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
811 274 :
812 274 : if res.is_ok() {
813 270 : LAYER_IMPL_METRICS.inc_started_deletes();
814 270 : }
815 274 : }
816 :
817 : /// Cancellation safe, however dropping the future and calling this method again might result
818 : /// in a new attempt to evict OR join the previously started attempt.
819 108 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
820 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
821 : assert!(self.have_remote_client);
822 :
823 : let mut rx = self.status.as_ref().unwrap().subscribe();
824 :
825 : {
826 : let current = rx.borrow_and_update();
827 : match &*current {
828 : Status::Resident => {
829 : // we might get lucky and evict this; continue
830 : }
831 : Status::Evicted | Status::Downloading => {
832 : // it is already evicted
833 : return Err(EvictionError::NotFound);
834 : }
835 : }
836 : }
837 :
838 : let strong = {
839 : match self.inner.get() {
840 : Some(mut either) => either.downgrade(),
841 : None => {
842 : // we already have a scheduled eviction, which just has not gotten to run yet.
843 : // it might still race with a read access, but that could also get cancelled,
844 : // so let's say this is not evictable.
845 : return Err(EvictionError::NotFound);
846 : }
847 : }
848 : };
849 :
850 : if strong.is_some() {
851 : // drop the DownloadedLayer outside of the holding the guard
852 : drop(strong);
853 :
854 : // idea here is that only one evicter should ever get to witness a strong reference,
855 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
856 : // cancelled eviction and signal us, like it currently does.
857 : //
858 : // a second concurrent evict_and_wait will not see a strong reference.
859 : LAYER_IMPL_METRICS.inc_started_evictions();
860 : }
861 :
862 : let changed = rx.changed();
863 : let changed = tokio::time::timeout(timeout, changed).await;
864 :
865 : let Ok(changed) = changed else {
866 : return Err(EvictionError::Timeout);
867 : };
868 :
869 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
870 :
871 : let current = rx.borrow_and_update();
872 :
873 : match &*current {
874 : // the easiest case
875 : Status::Evicted => Ok(()),
876 : // it surely was evicted in between, but then there was a new access now; we can't know
877 : // if it'll succeed so lets just call it evicted
878 : Status::Downloading => Ok(()),
879 : // either the download which was started after eviction completed already, or it was
880 : // never evicted
881 : Status::Resident => Err(EvictionError::Downloaded),
882 : }
883 : }
884 :
885 : /// Cancellation safe.
886 209748 : async fn get_or_maybe_download(
887 209748 : self: &Arc<Self>,
888 209748 : allow_download: bool,
889 209748 : ctx: Option<&RequestContext>,
890 209748 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
891 16 : let (weak, permit) = {
892 : // get_or_init_detached can:
893 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
894 : // - be slow (wait for semaphore permit or closing)
895 209748 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
896 :
897 209748 : let locked = self
898 209748 : .inner
899 209748 : .get_or_init_detached()
900 2 : .await
901 209748 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
902 209748 :
903 209748 : scopeguard::ScopeGuard::into_inner(init_cancelled);
904 :
905 209732 : match locked {
906 : // this path could had been a RwLock::read
907 209732 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
908 0 : Ok(Ok((strong, _))) => {
909 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
910 0 : // previously a `evict_and_wait` was received. this is the only place when we
911 0 : // send out an update without holding the InitPermit.
912 0 : //
913 0 : // note that we also have dropped the Guard; this is fine, because we just made
914 0 : // a state change and are holding a strong reference to be returned.
915 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
916 0 : LAYER_IMPL_METRICS
917 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
918 0 :
919 0 : return Ok(strong);
920 : }
921 8 : Ok(Err(guard)) => {
922 8 : // path to here: we won the eviction, the file should still be on the disk.
923 8 : let (weak, permit) = guard.take_and_deinit();
924 8 : (Some(weak), permit)
925 : }
926 8 : Err(permit) => (None, permit),
927 : }
928 : };
929 :
930 16 : if let Some(weak) = weak {
931 : // only drop the weak after dropping the heavier_once_cell guard
932 8 : assert!(
933 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
934 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
935 : );
936 8 : }
937 :
938 16 : let timeline = self
939 16 : .timeline
940 16 : .upgrade()
941 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
942 :
943 : // count cancellations, which currently remain largely unexpected
944 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
945 :
946 : // check if we really need to be downloaded: this can happen if a read access won the
947 : // semaphore before eviction.
948 : //
949 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
950 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
951 16 : let needs_download = self
952 16 : .needs_download()
953 16 : .await
954 16 : .map_err(DownloadError::PreStatFailed);
955 16 :
956 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
957 :
958 16 : let needs_download = needs_download?;
959 :
960 16 : let Some(reason) = needs_download else {
961 : // the file is present locally because eviction has not had a chance to run yet
962 :
963 : #[cfg(test)]
964 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
965 2 : .await?;
966 :
967 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
968 6 :
969 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
970 : };
971 :
972 : // we must download; getting cancelled before spawning the download is not an issue as
973 : // any still running eviction would not find anything to evict.
974 :
975 8 : if let NeedsDownload::NotFile(ft) = reason {
976 0 : return Err(DownloadError::NotFile(ft));
977 8 : }
978 8 :
979 8 : if timeline.remote_client.as_ref().is_none() {
980 0 : return Err(DownloadError::NoRemoteStorage);
981 8 : }
982 :
983 8 : if let Some(ctx) = ctx {
984 2 : self.check_expected_download(ctx)?;
985 6 : }
986 :
987 8 : if !allow_download {
988 : // this is only used from tests, but it is hard to test without the boolean
989 2 : return Err(DownloadError::DownloadRequired);
990 6 : }
991 6 :
992 6 : let download_ctx = ctx
993 6 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
994 6 : .unwrap_or(RequestContext::new(
995 6 : TaskKind::LayerDownload,
996 6 : DownloadBehavior::Download,
997 6 : ));
998 :
999 6 : async move {
1000 6 : tracing::info!(%reason, "downloading on-demand");
1001 :
1002 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1003 6 : let res = self
1004 6 : .download_init_and_wait(timeline, permit, download_ctx)
1005 10 : .await?;
1006 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1007 6 : Ok(res)
1008 6 : }
1009 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1010 10 : .await
1011 209748 : }
1012 :
1013 : /// Nag or fail per RequestContext policy
1014 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1015 2 : use crate::context::DownloadBehavior::*;
1016 2 : let b = ctx.download_behavior();
1017 2 : match b {
1018 2 : Download => Ok(()),
1019 : Warn | Error => {
1020 0 : tracing::info!(
1021 0 : "unexpectedly on-demand downloading for task kind {:?}",
1022 0 : ctx.task_kind()
1023 : );
1024 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1025 :
1026 0 : let really_error =
1027 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1028 :
1029 0 : if really_error {
1030 : // this check is only probablistic, seems like flakyness footgun
1031 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1032 : } else {
1033 0 : Ok(())
1034 : }
1035 : }
1036 : }
1037 2 : }
1038 :
1039 : /// Actual download, at most one is executed at the time.
1040 6 : async fn download_init_and_wait(
1041 6 : self: &Arc<Self>,
1042 6 : timeline: Arc<Timeline>,
1043 6 : permit: heavier_once_cell::InitPermit,
1044 6 : ctx: RequestContext,
1045 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1046 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1047 6 :
1048 6 : let (tx, rx) = tokio::sync::oneshot::channel();
1049 6 :
1050 6 : let this: Arc<Self> = self.clone();
1051 :
1052 6 : let guard = timeline
1053 6 : .gate
1054 6 : .enter()
1055 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
1056 :
1057 6 : Self::spawn(
1058 6 : async move {
1059 0 : let _guard = guard;
1060 0 :
1061 0 : // now that we have commited to downloading, send out an update to:
1062 0 : // - unhang any pending eviction
1063 0 : // - break out of evict_and_wait
1064 0 : this.status
1065 0 : .as_ref()
1066 0 : .unwrap()
1067 0 : .send_replace(Status::Downloading);
1068 6 :
1069 6 : #[cfg(test)]
1070 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1071 2 : .await
1072 6 : .unwrap();
1073 :
1074 94 : let res = this.download_and_init(timeline, permit, &ctx).await;
1075 :
1076 6 : if let Err(res) = tx.send(res) {
1077 0 : match res {
1078 0 : Ok(_res) => {
1079 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1080 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1081 : }
1082 0 : Err(e) => {
1083 0 : tracing::info!(
1084 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1085 : );
1086 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1087 : }
1088 : }
1089 6 : }
1090 6 : }
1091 6 : .in_current_span(),
1092 6 : );
1093 6 :
1094 10 : match rx.await {
1095 6 : Ok(Ok(res)) => Ok(res),
1096 0 : Ok(Err(e)) => {
1097 0 : // sleep already happened in the spawned task, if it was not cancelled
1098 0 : match e.downcast_ref::<remote_storage::DownloadError>() {
1099 : // If the download failed due to its cancellation token,
1100 : // propagate the cancellation error upstream.
1101 : Some(remote_storage::DownloadError::Cancelled) => {
1102 0 : Err(DownloadError::DownloadCancelled)
1103 : }
1104 : // FIXME: this is not embedding the error because historically it would had
1105 : // been output to compute, however that is no longer the case.
1106 0 : _ => Err(DownloadError::DownloadFailed),
1107 : }
1108 : }
1109 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1110 : }
1111 6 : }
1112 :
1113 6 : async fn download_and_init(
1114 6 : self: &Arc<LayerInner>,
1115 6 : timeline: Arc<Timeline>,
1116 6 : permit: heavier_once_cell::InitPermit,
1117 6 : ctx: &RequestContext,
1118 6 : ) -> anyhow::Result<Arc<DownloadedLayer>> {
1119 6 : let client = timeline
1120 6 : .remote_client
1121 6 : .as_ref()
1122 6 : .expect("checked before download_init_and_wait");
1123 :
1124 6 : let result = client
1125 6 : .download_layer_file(
1126 6 : &self.desc.layer_name(),
1127 6 : &self.metadata(),
1128 6 : &timeline.cancel,
1129 6 : ctx,
1130 6 : )
1131 88 : .await;
1132 :
1133 6 : match result {
1134 6 : Ok(size) => {
1135 6 : assert_eq!(size, self.desc.file_size);
1136 :
1137 6 : match self.needs_download().await {
1138 0 : Ok(Some(reason)) => {
1139 0 : // this is really a bug in needs_download or remote timeline client
1140 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1141 : }
1142 6 : Ok(None) => {
1143 6 : // as expected
1144 6 : }
1145 0 : Err(e) => {
1146 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1147 : }
1148 : }
1149 :
1150 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1151 6 : timeline
1152 6 : .metrics
1153 6 : .resident_physical_size_add(self.desc.file_size);
1154 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1155 6 :
1156 6 : let since_last_eviction = self
1157 6 : .last_evicted_at
1158 6 : .lock()
1159 6 : .unwrap()
1160 6 : .take()
1161 6 : .map(|ts| ts.elapsed());
1162 6 : if let Some(since_last_eviction) = since_last_eviction {
1163 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1164 6 : }
1165 :
1166 6 : self.access_stats.record_residence_event(
1167 6 : LayerResidenceStatus::Resident,
1168 6 : LayerResidenceEventReason::ResidenceChange,
1169 6 : );
1170 6 :
1171 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1172 : }
1173 0 : Err(e) => {
1174 0 : let consecutive_failures =
1175 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1176 0 :
1177 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1178 :
1179 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1180 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1181 0 : 1.5,
1182 0 : 60.0,
1183 0 : );
1184 0 :
1185 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1186 :
1187 : tokio::select! {
1188 : _ = tokio::time::sleep(backoff) => {},
1189 : _ = timeline.cancel.cancelled() => {},
1190 : };
1191 :
1192 0 : Err(e)
1193 : }
1194 : }
1195 6 : }
1196 :
1197 : /// Initializes the `Self::inner` to a "resident" state.
1198 : ///
1199 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1200 : /// before calling this method.
1201 : ///
1202 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1203 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1204 12 : fn initialize_after_layer_is_on_disk(
1205 12 : self: &Arc<LayerInner>,
1206 12 : permit: heavier_once_cell::InitPermit,
1207 12 : ) -> Arc<DownloadedLayer> {
1208 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1209 12 :
1210 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1211 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1212 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1213 12 :
1214 12 : let res = Arc::new(DownloadedLayer {
1215 12 : owner: Arc::downgrade(self),
1216 12 : kind: tokio::sync::OnceCell::default(),
1217 12 : version: next_version,
1218 12 : });
1219 12 :
1220 12 : let waiters = self.inner.initializer_count();
1221 12 : if waiters > 0 {
1222 0 : tracing::info!(waiters, "completing layer init for other tasks");
1223 12 : }
1224 :
1225 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1226 12 :
1227 12 : self.inner.set(value, permit);
1228 12 :
1229 12 : res
1230 12 : }
1231 :
1232 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1233 24 : match tokio::fs::metadata(&self.path).await {
1234 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1235 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1236 0 : Err(e) => Err(e),
1237 : }
1238 24 : }
1239 :
1240 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1241 24 : match self.path.metadata() {
1242 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1243 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1244 0 : Err(e) => Err(e),
1245 : }
1246 24 : }
1247 :
1248 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1249 40 : // in future, this should include sha2-256 validation of the file.
1250 40 : if !m.is_file() {
1251 0 : Err(NeedsDownload::NotFile(m.file_type()))
1252 40 : } else if m.len() != self.desc.file_size {
1253 0 : Err(NeedsDownload::WrongSize {
1254 0 : actual: m.len(),
1255 0 : expected: self.desc.file_size,
1256 0 : })
1257 : } else {
1258 40 : Ok(())
1259 : }
1260 40 : }
1261 :
1262 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1263 0 : let layer_name = self.desc.layer_name().to_string();
1264 0 :
1265 0 : let resident = self
1266 0 : .inner
1267 0 : .get()
1268 0 : .map(|rowe| rowe.is_likely_resident())
1269 0 : .unwrap_or(false);
1270 0 :
1271 0 : let access_stats = self.access_stats.as_api_model(reset);
1272 0 :
1273 0 : if self.desc.is_delta {
1274 0 : let lsn_range = &self.desc.lsn_range;
1275 0 :
1276 0 : HistoricLayerInfo::Delta {
1277 0 : layer_file_name: layer_name,
1278 0 : layer_file_size: self.desc.file_size,
1279 0 : lsn_start: lsn_range.start,
1280 0 : lsn_end: lsn_range.end,
1281 0 : remote: !resident,
1282 0 : access_stats,
1283 0 : }
1284 : } else {
1285 0 : let lsn = self.desc.image_layer_lsn();
1286 0 :
1287 0 : HistoricLayerInfo::Image {
1288 0 : layer_file_name: layer_name,
1289 0 : layer_file_size: self.desc.file_size,
1290 0 : lsn_start: lsn,
1291 0 : remote: !resident,
1292 0 : access_stats,
1293 0 : }
1294 : }
1295 0 : }
1296 :
1297 : /// `DownloadedLayer` is being dropped, so it calls this method.
1298 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1299 24 : let can_evict = self.have_remote_client;
1300 :
1301 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1302 : // though here it is very likely
1303 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1304 :
1305 24 : if !can_evict {
1306 : // it would be nice to assert this case out, but we are in drop
1307 0 : span.in_scope(|| {
1308 0 : tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
1309 0 : });
1310 0 : return;
1311 24 : }
1312 24 :
1313 24 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1314 24 : // drop while the `self.inner` is being locked, leading to a deadlock.
1315 24 :
1316 24 : let start_evicting = async move {
1317 24 : #[cfg(test)]
1318 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1319 14 : .await
1320 24 : .expect("failpoint should not have errored");
1321 24 :
1322 24 : tracing::debug!("eviction started");
1323 :
1324 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1325 : // metrics: ignore the Ok branch, it is not done yet
1326 24 : if let Err(e) = res {
1327 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1328 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1329 18 : }
1330 24 : };
1331 :
1332 24 : Self::spawn(start_evicting.instrument(span));
1333 24 : }
1334 :
1335 24 : async fn wait_for_turn_and_evict(
1336 24 : self: Arc<LayerInner>,
1337 24 : only_version: usize,
1338 24 : ) -> Result<(), EvictionCancelled> {
1339 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1340 46 : use Status::*;
1341 46 : match status {
1342 44 : Resident => Ok(()),
1343 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1344 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1345 : }
1346 46 : }
1347 :
1348 24 : let timeline = self
1349 24 : .timeline
1350 24 : .upgrade()
1351 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1352 :
1353 24 : let mut rx = self
1354 24 : .status
1355 24 : .as_ref()
1356 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1357 24 : .subscribe();
1358 24 :
1359 24 : is_good_to_continue(&rx.borrow_and_update())?;
1360 :
1361 22 : let Ok(_gate) = timeline.gate.enter() else {
1362 0 : return Err(EvictionCancelled::TimelineGone);
1363 : };
1364 :
1365 18 : let permit = {
1366 : // we cannot just `std::fs::remove_file` because there might already be an
1367 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1368 : // operations must be done while holding the heavier_once_cell::InitPermit
1369 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1370 :
1371 22 : let waited = loop {
1372 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1373 22 : // completion of the download. waiting for download could be long and hinder our
1374 22 : // efforts to alert on "hanging" evictions.
1375 22 : tokio::select! {
1376 : res = &mut wait => break res,
1377 : _ = rx.changed() => {
1378 : is_good_to_continue(&rx.borrow_and_update())?;
1379 : // two possibilities for Status::Resident:
1380 : // - the layer was found locally from disk by a read
1381 : // - we missed a bunch of updates and now the layer is
1382 : // again downloaded -- assume we'll fail later on with
1383 : // version check or AlreadyReinitialized
1384 : }
1385 22 : }
1386 22 : };
1387 :
1388 : // re-check now that we have the guard or permit; all updates should have happened
1389 : // while holding the permit.
1390 22 : is_good_to_continue(&rx.borrow_and_update())?;
1391 :
1392 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1393 : // lead to deallocating the reference counted value, and the value we
1394 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1395 22 : let (_weak, permit) = match waited {
1396 20 : Ok(guard) => {
1397 20 : match &*guard {
1398 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1399 18 : if *version == only_version =>
1400 16 : {
1401 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1402 16 : let (weak, permit) = guard.take_and_deinit();
1403 16 : (Some(weak), permit)
1404 : }
1405 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1406 2 : // if we were not doing the version check, we would need to try to
1407 2 : // upgrade the weak here to see if it really is dropped. version check
1408 2 : // is done instead assuming that it is cheaper.
1409 2 : tracing::debug!(
1410 : version,
1411 : only_version,
1412 0 : "version mismatch, not deinitializing"
1413 : );
1414 2 : return Err(EvictionCancelled::VersionCheckFailed);
1415 : }
1416 : ResidentOrWantedEvicted::Resident(_) => {
1417 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1418 : }
1419 : }
1420 : }
1421 2 : Err(permit) => {
1422 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1423 2 : (None, permit)
1424 : }
1425 : };
1426 :
1427 18 : permit
1428 18 : };
1429 18 :
1430 18 : let span = tracing::Span::current();
1431 18 :
1432 18 : let spawned_at = std::time::Instant::now();
1433 18 :
1434 18 : // this is on purpose a detached spawn; we don't need to wait for it
1435 18 : //
1436 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1437 18 : // well from a spawn_blocking thread.
1438 18 : //
1439 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1440 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1441 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1442 18 : // evicting.
1443 18 : //
1444 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1445 18 : // reads. We will need to add cancellation for that if necessary.
1446 18 : Self::spawn_blocking(move || {
1447 18 : let _span = span.entered();
1448 18 :
1449 18 : let res = self.evict_blocking(&timeline, &permit);
1450 18 :
1451 18 : let waiters = self.inner.initializer_count();
1452 18 :
1453 18 : if waiters > 0 {
1454 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1455 18 : }
1456 :
1457 18 : let completed_in = spawned_at.elapsed();
1458 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1459 18 :
1460 18 : match res {
1461 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1462 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1463 : }
1464 :
1465 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1466 18 : });
1467 18 :
1468 18 : Ok(())
1469 24 : }
1470 :
1471 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1472 18 : fn evict_blocking(
1473 18 : &self,
1474 18 : timeline: &Timeline,
1475 18 : _permit: &heavier_once_cell::InitPermit,
1476 18 : ) -> Result<(), EvictionCancelled> {
1477 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1478 18 :
1479 18 : match capture_mtime_and_remove(&self.path) {
1480 18 : Ok(local_layer_mtime) => {
1481 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1482 18 : match duration {
1483 18 : Ok(elapsed) => {
1484 18 : timeline
1485 18 : .metrics
1486 18 : .evictions_with_low_residence_duration
1487 18 : .read()
1488 18 : .unwrap()
1489 18 : .observe(elapsed);
1490 18 : tracing::info!(
1491 0 : residence_millis = elapsed.as_millis(),
1492 0 : "evicted layer after known residence period"
1493 : );
1494 : }
1495 : Err(_) => {
1496 0 : tracing::info!("evicted layer after unknown residence period");
1497 : }
1498 : }
1499 18 : timeline.metrics.evictions.inc();
1500 18 : timeline
1501 18 : .metrics
1502 18 : .resident_physical_size_sub(self.desc.file_size);
1503 : }
1504 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1505 0 : tracing::error!(
1506 : layer_size = %self.desc.file_size,
1507 0 : "failed to evict layer from disk, it was already gone"
1508 : );
1509 0 : return Err(EvictionCancelled::FileNotFound);
1510 : }
1511 0 : Err(e) => {
1512 0 : // FIXME: this should probably be an abort
1513 0 : tracing::error!("failed to evict file from disk: {e:#}");
1514 0 : return Err(EvictionCancelled::RemoveFailed);
1515 : }
1516 : }
1517 :
1518 18 : self.access_stats.record_residence_event(
1519 18 : LayerResidenceStatus::Evicted,
1520 18 : LayerResidenceEventReason::ResidenceChange,
1521 18 : );
1522 18 :
1523 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1524 18 :
1525 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1526 18 :
1527 18 : Ok(())
1528 18 : }
1529 :
1530 1450 : fn metadata(&self) -> LayerFileMetadata {
1531 1450 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1532 1450 : }
1533 :
1534 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1535 : ///
1536 : /// Synchronizing with spawned tasks is very complicated otherwise.
1537 30 : fn spawn<F>(fut: F)
1538 30 : where
1539 30 : F: std::future::Future<Output = ()> + Send + 'static,
1540 30 : {
1541 30 : #[cfg(test)]
1542 30 : tokio::task::spawn(fut);
1543 30 : #[cfg(not(test))]
1544 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1545 30 : }
1546 :
1547 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1548 288 : fn spawn_blocking<F>(f: F)
1549 288 : where
1550 288 : F: FnOnce() + Send + 'static,
1551 288 : {
1552 288 : #[cfg(test)]
1553 288 : tokio::task::spawn_blocking(f);
1554 288 : #[cfg(not(test))]
1555 288 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1556 288 : }
1557 : }
1558 :
1559 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1560 18 : let m = path.metadata()?;
1561 18 : let local_layer_mtime = m.modified()?;
1562 18 : std::fs::remove_file(path)?;
1563 18 : Ok(local_layer_mtime)
1564 18 : }
1565 :
1566 0 : #[derive(Debug, thiserror::Error)]
1567 : pub(crate) enum EvictionError {
1568 : #[error("layer was already evicted")]
1569 : NotFound,
1570 :
1571 : /// Evictions must always lose to downloads in races, and this time it happened.
1572 : #[error("layer was downloaded instead")]
1573 : Downloaded,
1574 :
1575 : #[error("eviction did not happen within timeout")]
1576 : Timeout,
1577 : }
1578 :
1579 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1580 0 : #[derive(Debug, thiserror::Error)]
1581 : pub(crate) enum DownloadError {
1582 : #[error("timeline has already shutdown")]
1583 : TimelineShutdown,
1584 : #[error("no remote storage configured")]
1585 : NoRemoteStorage,
1586 : #[error("context denies downloading")]
1587 : ContextAndConfigReallyDeniesDownloads,
1588 : #[error("downloading is really required but not allowed by this method")]
1589 : DownloadRequired,
1590 : #[error("layer path exists, but it is not a file: {0:?}")]
1591 : NotFile(std::fs::FileType),
1592 : /// Why no error here? Because it will be reported by page_service. We should had also done
1593 : /// retries already.
1594 : #[error("downloading evicted layer file failed")]
1595 : DownloadFailed,
1596 : #[error("downloading failed, possibly for shutdown")]
1597 : DownloadCancelled,
1598 : #[error("pre-condition: stat before download failed")]
1599 : PreStatFailed(#[source] std::io::Error),
1600 :
1601 : #[cfg(test)]
1602 : #[error("failpoint: {0:?}")]
1603 : Failpoint(failpoints::FailpointKind),
1604 : }
1605 :
1606 : #[derive(Debug, PartialEq)]
1607 : pub(crate) enum NeedsDownload {
1608 : NotFound,
1609 : NotFile(std::fs::FileType),
1610 : WrongSize { actual: u64, expected: u64 },
1611 : }
1612 :
1613 : impl std::fmt::Display for NeedsDownload {
1614 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1615 6 : match self {
1616 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1617 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1618 0 : NeedsDownload::WrongSize { actual, expected } => {
1619 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1620 : }
1621 : }
1622 6 : }
1623 : }
1624 :
1625 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1626 : pub(crate) struct DownloadedLayer {
1627 : owner: Weak<LayerInner>,
1628 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1629 : // DownloadedLayer
1630 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1631 : version: usize,
1632 : }
1633 :
1634 : impl std::fmt::Debug for DownloadedLayer {
1635 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 0 : f.debug_struct("DownloadedLayer")
1637 0 : // owner omitted because it is always "Weak"
1638 0 : .field("kind", &self.kind)
1639 0 : .field("version", &self.version)
1640 0 : .finish()
1641 0 : }
1642 : }
1643 :
1644 : impl Drop for DownloadedLayer {
1645 328 : fn drop(&mut self) {
1646 328 : if let Some(owner) = self.owner.upgrade() {
1647 24 : owner.on_downloaded_layer_drop(self.version);
1648 304 : } else {
1649 304 : // Layer::drop will handle cancelling the eviction; because of drop order and
1650 304 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1651 304 : }
1652 328 : }
1653 : }
1654 :
1655 : impl DownloadedLayer {
1656 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
1657 : /// initialize it permanently.
1658 : ///
1659 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1660 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1661 : /// we will always have the LayerInner on the callstack, so we can just use it.
1662 209766 : async fn get<'a>(
1663 209766 : &'a self,
1664 209766 : owner: &Arc<LayerInner>,
1665 209766 : ctx: &RequestContext,
1666 209766 : ) -> anyhow::Result<&'a LayerKind> {
1667 209766 : let init = || async {
1668 844 : assert_eq!(
1669 844 : Weak::as_ptr(&self.owner),
1670 844 : Arc::as_ptr(owner),
1671 844 : "these are the same, just avoiding the upgrade"
1672 844 : );
1673 844 :
1674 844 : let res = if owner.desc.is_delta {
1675 844 : let summary = Some(delta_layer::Summary::expected(
1676 818 : owner.desc.tenant_shard_id.tenant_id,
1677 818 : owner.desc.timeline_id,
1678 818 : owner.desc.key_range.clone(),
1679 818 : owner.desc.lsn_range.clone(),
1680 818 : ));
1681 818 : delta_layer::DeltaLayerInner::load(
1682 818 : &owner.path,
1683 818 : summary,
1684 818 : Some(owner.conf.max_vectored_read_bytes),
1685 818 : ctx,
1686 818 : )
1687 844 : .await
1688 844 : .map(|res| res.map(LayerKind::Delta))
1689 844 : } else {
1690 844 : let lsn = owner.desc.image_layer_lsn();
1691 26 : let summary = Some(image_layer::Summary::expected(
1692 26 : owner.desc.tenant_shard_id.tenant_id,
1693 26 : owner.desc.timeline_id,
1694 26 : owner.desc.key_range.clone(),
1695 26 : lsn,
1696 26 : ));
1697 26 : image_layer::ImageLayerInner::load(
1698 26 : &owner.path,
1699 26 : lsn,
1700 26 : summary,
1701 26 : Some(owner.conf.max_vectored_read_bytes),
1702 26 : ctx,
1703 26 : )
1704 844 : .await
1705 844 : .map(|res| res.map(LayerKind::Image))
1706 844 : };
1707 844 :
1708 844 : match res {
1709 844 : Ok(Ok(layer)) => Ok(Ok(layer)),
1710 844 : Ok(Err(transient)) => Err(transient),
1711 844 : Err(permanent) => {
1712 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1713 0 : // TODO(#5815): we are not logging all errors, so temporarily log them **once**
1714 0 : // here as well
1715 0 : let permanent = permanent.context("load layer");
1716 0 : tracing::error!("layer loading failed permanently: {permanent:#}");
1717 844 : Ok(Err(permanent))
1718 844 : }
1719 844 : }
1720 844 : };
1721 209766 : self.kind
1722 209766 : .get_or_try_init(init)
1723 : // return transient errors using `?`
1724 852 : .await?
1725 209766 : .as_ref()
1726 209766 : .map_err(|e| {
1727 0 : // errors are not clonabled, cannot but stringify
1728 0 : // test_broken_timeline matches this string
1729 0 : anyhow::anyhow!("layer loading failed: {e:#}")
1730 209766 : })
1731 209766 : }
1732 :
1733 209308 : async fn get_value_reconstruct_data(
1734 209308 : &self,
1735 209308 : key: Key,
1736 209308 : lsn_range: Range<Lsn>,
1737 209308 : reconstruct_data: &mut ValueReconstructState,
1738 209308 : owner: &Arc<LayerInner>,
1739 209308 : ctx: &RequestContext,
1740 209308 : ) -> anyhow::Result<ValueReconstructResult> {
1741 209308 : use LayerKind::*;
1742 209308 :
1743 209308 : match self.get(owner, ctx).await? {
1744 208782 : Delta(d) => {
1745 208782 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1746 28434 : .await
1747 : }
1748 526 : Image(i) => {
1749 526 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1750 457 : .await
1751 : }
1752 : }
1753 209308 : }
1754 :
1755 150 : async fn get_values_reconstruct_data(
1756 150 : &self,
1757 150 : keyspace: KeySpace,
1758 150 : lsn_range: Range<Lsn>,
1759 150 : reconstruct_data: &mut ValuesReconstructState,
1760 150 : owner: &Arc<LayerInner>,
1761 150 : ctx: &RequestContext,
1762 150 : ) -> Result<(), GetVectoredError> {
1763 150 : use LayerKind::*;
1764 150 :
1765 150 : match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
1766 146 : Delta(d) => {
1767 146 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1768 7359 : .await
1769 : }
1770 4 : Image(i) => {
1771 4 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1772 1 : .await
1773 : }
1774 : }
1775 150 : }
1776 :
1777 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1778 4 : use LayerKind::*;
1779 4 : match self.get(owner, ctx).await? {
1780 4 : Delta(d) => d.dump(ctx).await?,
1781 0 : Image(i) => i.dump(ctx).await?,
1782 : }
1783 :
1784 4 : Ok(())
1785 4 : }
1786 : }
1787 :
1788 : /// Wrapper around an actual layer implementation.
1789 : #[derive(Debug)]
1790 : enum LayerKind {
1791 : Delta(delta_layer::DeltaLayerInner),
1792 : Image(image_layer::ImageLayerInner),
1793 : }
1794 :
1795 : /// Guard for forcing a layer be resident while it exists.
1796 : #[derive(Clone)]
1797 : pub(crate) struct ResidentLayer {
1798 : owner: Layer,
1799 : downloaded: Arc<DownloadedLayer>,
1800 : }
1801 :
1802 : impl std::fmt::Display for ResidentLayer {
1803 1436 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1804 1436 : write!(f, "{}", self.owner)
1805 1436 : }
1806 : }
1807 :
1808 : impl std::fmt::Debug for ResidentLayer {
1809 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1810 0 : write!(f, "{}", self.owner)
1811 0 : }
1812 : }
1813 :
1814 : impl ResidentLayer {
1815 : /// Release the eviction guard, converting back into a plain [`Layer`].
1816 : ///
1817 : /// You can access the [`Layer`] also by using `as_ref`.
1818 280 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1819 280 : self.into()
1820 280 : }
1821 :
1822 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1823 524 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1824 : pub(crate) async fn load_keys<'a>(
1825 : &'a self,
1826 : ctx: &RequestContext,
1827 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1828 : use LayerKind::*;
1829 :
1830 : let owner = &self.owner.0;
1831 :
1832 : match self.downloaded.get(owner, ctx).await? {
1833 : Delta(ref d) => {
1834 : owner
1835 : .access_stats
1836 : .record_access(LayerAccessKind::KeyIter, ctx);
1837 :
1838 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1839 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1840 : // while it's being held.
1841 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1842 : .await
1843 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1844 : }
1845 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1846 : }
1847 : }
1848 :
1849 : /// Returns the amount of keys and values written to the writer.
1850 10 : pub(crate) async fn copy_delta_prefix(
1851 10 : &self,
1852 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1853 10 : until: Lsn,
1854 10 : ctx: &RequestContext,
1855 10 : ) -> anyhow::Result<usize> {
1856 10 : use LayerKind::*;
1857 10 :
1858 10 : let owner = &self.owner.0;
1859 10 :
1860 10 : match self.downloaded.get(owner, ctx).await? {
1861 10 : Delta(ref d) => d
1862 10 : .copy_prefix(writer, until, ctx)
1863 13 : .await
1864 10 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1865 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1866 : }
1867 10 : }
1868 :
1869 1072 : pub(crate) fn local_path(&self) -> &Utf8Path {
1870 1072 : &self.owner.0.path
1871 1072 : }
1872 :
1873 1174 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1874 1174 : self.owner.metadata()
1875 1174 : }
1876 :
1877 : #[cfg(test)]
1878 32 : pub(crate) async fn as_delta(
1879 32 : &self,
1880 32 : ctx: &RequestContext,
1881 32 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1882 32 : use LayerKind::*;
1883 32 : match self.downloaded.get(&self.owner.0, ctx).await? {
1884 32 : Delta(ref d) => Ok(d),
1885 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1886 : }
1887 32 : }
1888 : }
1889 :
1890 : impl AsLayerDesc for ResidentLayer {
1891 4146 : fn layer_desc(&self) -> &PersistentLayerDesc {
1892 4146 : self.owner.layer_desc()
1893 4146 : }
1894 : }
1895 :
1896 : impl AsRef<Layer> for ResidentLayer {
1897 1390 : fn as_ref(&self) -> &Layer {
1898 1390 : &self.owner
1899 1390 : }
1900 : }
1901 :
1902 : /// Drop the eviction guard.
1903 : impl From<ResidentLayer> for Layer {
1904 280 : fn from(value: ResidentLayer) -> Self {
1905 280 : value.owner
1906 280 : }
1907 : }
1908 :
1909 : use metrics::IntCounter;
1910 :
1911 : pub(crate) struct LayerImplMetrics {
1912 : started_evictions: IntCounter,
1913 : completed_evictions: IntCounter,
1914 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1915 :
1916 : started_deletes: IntCounter,
1917 : completed_deletes: IntCounter,
1918 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1919 :
1920 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1921 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1922 : redownload_after: metrics::Histogram,
1923 : time_to_evict: metrics::Histogram,
1924 : }
1925 :
1926 : impl Default for LayerImplMetrics {
1927 26 : fn default() -> Self {
1928 26 : use enum_map::Enum;
1929 26 :
1930 26 : // reminder: these will be pageserver_layer_* with "_total" suffix
1931 26 :
1932 26 : let started_evictions = metrics::register_int_counter!(
1933 : "pageserver_layer_started_evictions",
1934 : "Evictions started in the Layer implementation"
1935 : )
1936 26 : .unwrap();
1937 26 : let completed_evictions = metrics::register_int_counter!(
1938 : "pageserver_layer_completed_evictions",
1939 : "Evictions completed in the Layer implementation"
1940 : )
1941 26 : .unwrap();
1942 26 :
1943 26 : let cancelled_evictions = metrics::register_int_counter_vec!(
1944 : "pageserver_layer_cancelled_evictions_count",
1945 : "Different reasons for evictions to have been cancelled or failed",
1946 : &["reason"]
1947 : )
1948 26 : .unwrap();
1949 26 :
1950 234 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1951 234 : let reason = EvictionCancelled::from_usize(i);
1952 234 : let s = reason.as_str();
1953 234 : cancelled_evictions.with_label_values(&[s])
1954 234 : }));
1955 26 :
1956 26 : let started_deletes = metrics::register_int_counter!(
1957 : "pageserver_layer_started_deletes",
1958 : "Deletions on drop pending in the Layer implementation"
1959 : )
1960 26 : .unwrap();
1961 26 : let completed_deletes = metrics::register_int_counter!(
1962 : "pageserver_layer_completed_deletes",
1963 : "Deletions on drop completed in the Layer implementation"
1964 : )
1965 26 : .unwrap();
1966 26 :
1967 26 : let failed_deletes = metrics::register_int_counter_vec!(
1968 : "pageserver_layer_failed_deletes_count",
1969 : "Different reasons for deletions on drop to have failed",
1970 : &["reason"]
1971 : )
1972 26 : .unwrap();
1973 26 :
1974 52 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1975 52 : let reason = DeleteFailed::from_usize(i);
1976 52 : let s = reason.as_str();
1977 52 : failed_deletes.with_label_values(&[s])
1978 52 : }));
1979 26 :
1980 26 : let rare_counters = metrics::register_int_counter_vec!(
1981 : "pageserver_layer_assumed_rare_count",
1982 : "Times unexpected or assumed rare event happened",
1983 : &["event"]
1984 : )
1985 26 : .unwrap();
1986 26 :
1987 182 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1988 182 : let event = RareEvent::from_usize(i);
1989 182 : let s = event.as_str();
1990 182 : rare_counters.with_label_values(&[s])
1991 182 : }));
1992 26 :
1993 26 : let inits_cancelled = metrics::register_int_counter!(
1994 : "pageserver_layer_inits_cancelled_count",
1995 : "Times Layer initialization was cancelled",
1996 : )
1997 26 : .unwrap();
1998 26 :
1999 26 : let redownload_after = {
2000 26 : let minute = 60.0;
2001 26 : let hour = 60.0 * minute;
2002 : metrics::register_histogram!(
2003 : "pageserver_layer_redownloaded_after",
2004 : "Time between evicting and re-downloading.",
2005 : vec![
2006 : 10.0,
2007 : 30.0,
2008 : minute,
2009 : 5.0 * minute,
2010 : 15.0 * minute,
2011 : 30.0 * minute,
2012 : hour,
2013 : 12.0 * hour,
2014 : ]
2015 : )
2016 26 : .unwrap()
2017 26 : };
2018 26 :
2019 26 : let time_to_evict = metrics::register_histogram!(
2020 : "pageserver_layer_eviction_held_permit_seconds",
2021 : "Time eviction held the permit.",
2022 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2023 : )
2024 26 : .unwrap();
2025 26 :
2026 26 : Self {
2027 26 : started_evictions,
2028 26 : completed_evictions,
2029 26 : cancelled_evictions,
2030 26 :
2031 26 : started_deletes,
2032 26 : completed_deletes,
2033 26 : failed_deletes,
2034 26 :
2035 26 : rare_counters,
2036 26 : inits_cancelled,
2037 26 : redownload_after,
2038 26 : time_to_evict,
2039 26 : }
2040 26 : }
2041 : }
2042 :
2043 : impl LayerImplMetrics {
2044 26 : fn inc_started_evictions(&self) {
2045 26 : self.started_evictions.inc();
2046 26 : }
2047 18 : fn inc_completed_evictions(&self) {
2048 18 : self.completed_evictions.inc();
2049 18 : }
2050 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2051 8 : self.cancelled_evictions[reason].inc()
2052 8 : }
2053 :
2054 270 : fn inc_started_deletes(&self) {
2055 270 : self.started_deletes.inc();
2056 270 : }
2057 270 : fn inc_completed_deletes(&self) {
2058 270 : self.completed_deletes.inc();
2059 270 : }
2060 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2061 0 : self.failed_deletes[reason].inc();
2062 0 : }
2063 :
2064 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2065 : /// attempt regardless of failure to delete local file.
2066 0 : fn inc_delete_removes_failed(&self) {
2067 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2068 0 : }
2069 :
2070 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2071 : /// the single caller which can start the download, so use this counter to separte them.
2072 0 : fn inc_init_completed_without_requester(&self) {
2073 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2074 0 : }
2075 :
2076 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2077 0 : fn inc_download_failed_without_requester(&self) {
2078 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2079 0 : }
2080 :
2081 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2082 : ///
2083 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2084 : /// Option.
2085 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2086 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2087 0 : }
2088 :
2089 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2090 : /// running with remote storage.
2091 6 : fn inc_init_needed_no_download(&self) {
2092 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2093 6 : }
2094 :
2095 : /// Expected rare because all layer files should be readable and good
2096 0 : fn inc_permanent_loading_failures(&self) {
2097 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2098 0 : }
2099 :
2100 0 : fn inc_init_cancelled(&self) {
2101 0 : self.inits_cancelled.inc()
2102 0 : }
2103 :
2104 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2105 6 : self.redownload_after.observe(duration.as_secs_f64())
2106 6 : }
2107 :
2108 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2109 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2110 : /// from other evictions, so this could have noise as well.
2111 0 : fn inc_evicted_with_waiters(&self) {
2112 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2113 0 : }
2114 :
2115 : /// Recorded at least initially as the permit is now acquired in async context before
2116 : /// spawn_blocking action.
2117 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2118 18 : self.time_to_evict.observe(duration.as_secs_f64())
2119 18 : }
2120 : }
2121 :
2122 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2123 : enum EvictionCancelled {
2124 : LayerGone,
2125 : TimelineGone,
2126 : VersionCheckFailed,
2127 : FileNotFound,
2128 : RemoveFailed,
2129 : AlreadyReinitialized,
2130 : /// Not evicted because of a pending reinitialization
2131 : LostToDownload,
2132 : /// After eviction, there was a new layer access which cancelled the eviction.
2133 : UpgradedBackOnAccess,
2134 : UnexpectedEvictedState,
2135 : }
2136 :
2137 : impl EvictionCancelled {
2138 234 : fn as_str(&self) -> &'static str {
2139 234 : match self {
2140 26 : EvictionCancelled::LayerGone => "layer_gone",
2141 26 : EvictionCancelled::TimelineGone => "timeline_gone",
2142 26 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2143 26 : EvictionCancelled::FileNotFound => "file_not_found",
2144 26 : EvictionCancelled::RemoveFailed => "remove_failed",
2145 26 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2146 26 : EvictionCancelled::LostToDownload => "lost_to_download",
2147 26 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2148 26 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2149 : }
2150 234 : }
2151 : }
2152 :
2153 : #[derive(enum_map::Enum)]
2154 : enum DeleteFailed {
2155 : TimelineGone,
2156 : DeleteSchedulingFailed,
2157 : }
2158 :
2159 : impl DeleteFailed {
2160 52 : fn as_str(&self) -> &'static str {
2161 52 : match self {
2162 26 : DeleteFailed::TimelineGone => "timeline_gone",
2163 26 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2164 : }
2165 52 : }
2166 : }
2167 :
2168 : #[derive(enum_map::Enum)]
2169 : enum RareEvent {
2170 : RemoveOnDropFailed,
2171 : InitCompletedWithoutRequester,
2172 : DownloadFailedWithoutRequester,
2173 : UpgradedWantedEvicted,
2174 : InitWithoutDownload,
2175 : PermanentLoadingFailure,
2176 : EvictedWithWaiters,
2177 : }
2178 :
2179 : impl RareEvent {
2180 182 : fn as_str(&self) -> &'static str {
2181 182 : use RareEvent::*;
2182 182 :
2183 182 : match self {
2184 26 : RemoveOnDropFailed => "remove_on_drop_failed",
2185 26 : InitCompletedWithoutRequester => "init_completed_without",
2186 26 : DownloadFailedWithoutRequester => "download_failed_without",
2187 26 : UpgradedWantedEvicted => "raced_wanted_evicted",
2188 26 : InitWithoutDownload => "init_needed_no_download",
2189 26 : PermanentLoadingFailure => "permanent_loading_failure",
2190 26 : EvictedWithWaiters => "evicted_with_waiters",
2191 : }
2192 182 : }
2193 : }
2194 :
2195 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2196 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|