Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::{
5 : HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
6 : };
7 : use pageserver_api::shard::{ShardIndex, TenantShardId};
8 : use std::ops::Range;
9 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10 : use std::sync::{Arc, Weak};
11 : use std::time::{Duration, SystemTime};
12 : use tracing::Instrument;
13 : use utils::id::TimelineId;
14 : use utils::lsn::Lsn;
15 : use utils::sync::heavier_once_cell;
16 :
17 : use crate::config::PageServerConf;
18 : use crate::context::{DownloadBehavior, RequestContext};
19 : use crate::repository::Key;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::task_mgr::TaskKind;
22 : use crate::tenant::timeline::GetVectoredError;
23 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
24 :
25 : use super::delta_layer::{self, DeltaEntry};
26 : use super::image_layer;
27 : use super::{
28 : AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
29 : ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
30 : };
31 :
32 : use utils::generation::Generation;
33 :
34 : #[cfg(test)]
35 : mod tests;
36 :
37 : #[cfg(test)]
38 : mod failpoints;
39 :
40 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
41 : /// range of LSNs.
42 : ///
43 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
44 : /// layers are used to ingest incoming WAL, and provide fast access to the
45 : /// recent page versions. On-disk layers are stored as files on disk, and are
46 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
47 : /// [`InMemoryLayer`].
48 : ///
49 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
50 : /// A delta layer contains all modifications within a range of LSNs and keys.
51 : /// An image layer is a snapshot of all the data in a key-range, at a single
52 : /// LSN.
53 : ///
54 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
55 : /// general goal, read accesses should always win eviction and eviction should not wait for
56 : /// download.
57 : ///
58 : /// ### State transitions
59 : ///
60 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
61 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
62 : /// right size) or deleted.
63 : ///
64 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
65 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
66 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
67 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
68 : /// cancelling the eviction.
69 : ///
70 : /// ```text
71 : /// +-----------------+ get_or_maybe_download +--------------------------------+
72 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
73 : /// | ENOENT | /->| |
74 : /// +-----------------+ | +--------------------------------+
75 : /// ^ | | ^
76 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
77 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
78 : /// | | | | - re-initialize without download
79 : /// | | evict_and_wait | |
80 : /// +-----------------+ v |
81 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
82 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
83 : /// +-----------------+ +--------------------------------------+
84 : /// ```
85 : ///
86 : /// ### Unsupported
87 : ///
88 : /// - Evicting by the operator deleting files from the filesystem
89 : ///
90 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
91 : #[derive(Clone)]
92 : pub(crate) struct Layer(Arc<LayerInner>);
93 :
94 : impl std::fmt::Display for Layer {
95 1612 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 1612 : if matches!(self.0.generation, Generation::Broken) {
97 0 : write!(f, "{}-broken", self.layer_desc().short_id())
98 : } else {
99 1612 : write!(
100 1612 : f,
101 1612 : "{}{}",
102 1612 : self.layer_desc().short_id(),
103 1612 : self.0.generation.get_suffix()
104 1612 : )
105 : }
106 1612 : }
107 : }
108 :
109 : impl std::fmt::Debug for Layer {
110 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 0 : write!(f, "{}", self)
112 0 : }
113 : }
114 :
115 : impl AsLayerDesc for Layer {
116 713750 : fn layer_desc(&self) -> &PersistentLayerDesc {
117 713750 : self.0.layer_desc()
118 713750 : }
119 : }
120 :
121 : impl PartialEq for Layer {
122 4 : fn eq(&self, other: &Self) -> bool {
123 4 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
124 4 : }
125 : }
126 :
127 1302 : pub(crate) fn local_layer_path(
128 1302 : conf: &PageServerConf,
129 1302 : tenant_shard_id: &TenantShardId,
130 1302 : timeline_id: &TimelineId,
131 1302 : layer_file_name: &LayerName,
132 1302 : generation: &Generation,
133 1302 : ) -> Utf8PathBuf {
134 1302 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
135 1302 :
136 1302 : if generation.is_none() {
137 : // Without a generation, we may only use legacy path style
138 0 : timeline_path.join(layer_file_name.to_string())
139 : } else {
140 1302 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
141 : }
142 1302 : }
143 :
144 : impl Layer {
145 : /// Creates a layer value for a file we know to not be resident.
146 0 : pub(crate) fn for_evicted(
147 0 : conf: &'static PageServerConf,
148 0 : timeline: &Arc<Timeline>,
149 0 : file_name: LayerName,
150 0 : metadata: LayerFileMetadata,
151 0 : ) -> Self {
152 0 : let local_path = local_layer_path(
153 0 : conf,
154 0 : &timeline.tenant_shard_id,
155 0 : &timeline.timeline_id,
156 0 : &file_name,
157 0 : &metadata.generation,
158 0 : );
159 0 :
160 0 : let desc = PersistentLayerDesc::from_filename(
161 0 : timeline.tenant_shard_id,
162 0 : timeline.timeline_id,
163 0 : file_name,
164 0 : metadata.file_size(),
165 0 : );
166 0 :
167 0 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
168 0 :
169 0 : let owner = Layer(Arc::new(LayerInner::new(
170 0 : conf,
171 0 : timeline,
172 0 : local_path,
173 0 : access_stats,
174 0 : desc,
175 0 : None,
176 0 : metadata.generation,
177 0 : metadata.shard,
178 0 : )));
179 0 :
180 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
181 :
182 0 : owner
183 0 : }
184 :
185 : /// Creates a Layer value for a file we know to be resident in timeline directory.
186 24 : pub(crate) fn for_resident(
187 24 : conf: &'static PageServerConf,
188 24 : timeline: &Arc<Timeline>,
189 24 : local_path: Utf8PathBuf,
190 24 : file_name: LayerName,
191 24 : metadata: LayerFileMetadata,
192 24 : ) -> ResidentLayer {
193 24 : let desc = PersistentLayerDesc::from_filename(
194 24 : timeline.tenant_shard_id,
195 24 : timeline.timeline_id,
196 24 : file_name,
197 24 : metadata.file_size(),
198 24 : );
199 24 :
200 24 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
201 24 :
202 24 : let mut resident = None;
203 24 :
204 24 : let owner = Layer(Arc::new_cyclic(|owner| {
205 24 : let inner = Arc::new(DownloadedLayer {
206 24 : owner: owner.clone(),
207 24 : kind: tokio::sync::OnceCell::default(),
208 24 : version: 0,
209 24 : });
210 24 : resident = Some(inner.clone());
211 24 :
212 24 : LayerInner::new(
213 24 : conf,
214 24 : timeline,
215 24 : local_path,
216 24 : access_stats,
217 24 : desc,
218 24 : Some(inner),
219 24 : metadata.generation,
220 24 : metadata.shard,
221 24 : )
222 24 : }));
223 24 :
224 24 : let downloaded = resident.expect("just initialized");
225 24 :
226 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
227 :
228 24 : timeline
229 24 : .metrics
230 24 : .resident_physical_size_add(metadata.file_size());
231 24 :
232 24 : ResidentLayer { downloaded, owner }
233 24 : }
234 :
235 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
236 : /// temporary path.
237 1294 : pub(crate) fn finish_creating(
238 1294 : conf: &'static PageServerConf,
239 1294 : timeline: &Arc<Timeline>,
240 1294 : desc: PersistentLayerDesc,
241 1294 : temp_path: &Utf8Path,
242 1294 : ) -> anyhow::Result<ResidentLayer> {
243 1294 : let mut resident = None;
244 1294 :
245 1294 : let owner = Layer(Arc::new_cyclic(|owner| {
246 1294 : let inner = Arc::new(DownloadedLayer {
247 1294 : owner: owner.clone(),
248 1294 : kind: tokio::sync::OnceCell::default(),
249 1294 : version: 0,
250 1294 : });
251 1294 : resident = Some(inner.clone());
252 1294 : let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
253 1294 : access_stats.record_residence_event(
254 1294 : LayerResidenceStatus::Resident,
255 1294 : LayerResidenceEventReason::LayerCreate,
256 1294 : );
257 1294 :
258 1294 : let local_path = local_layer_path(
259 1294 : conf,
260 1294 : &timeline.tenant_shard_id,
261 1294 : &timeline.timeline_id,
262 1294 : &desc.layer_name(),
263 1294 : &timeline.generation,
264 1294 : );
265 1294 :
266 1294 : LayerInner::new(
267 1294 : conf,
268 1294 : timeline,
269 1294 : local_path,
270 1294 : access_stats,
271 1294 : desc,
272 1294 : Some(inner),
273 1294 : timeline.generation,
274 1294 : timeline.get_shard_index(),
275 1294 : )
276 1294 : }));
277 1294 :
278 1294 : let downloaded = resident.expect("just initialized");
279 1294 :
280 1294 : // if the rename works, the path is as expected
281 1294 : // TODO: sync system call
282 1294 : std::fs::rename(temp_path, owner.local_path())
283 1294 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
284 :
285 1294 : Ok(ResidentLayer { downloaded, owner })
286 1294 : }
287 :
288 : /// Requests the layer to be evicted and waits for this to be done.
289 : ///
290 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
291 : ///
292 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
293 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
294 : ///
295 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
296 : /// will happen regardless the future returned by this method completing unless there is a
297 : /// read access before eviction gets to complete.
298 : ///
299 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
300 : /// of download-evict cycle on retry.
301 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
302 50 : self.0.evict_and_wait(timeout).await
303 32 : }
304 :
305 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
306 : /// then.
307 : ///
308 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
309 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
310 : /// the value this is called on gets dropped.
311 : ///
312 : /// This is ensured by both of those methods accepting references to Layer.
313 : ///
314 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
315 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
316 340 : pub(crate) fn delete_on_drop(&self) {
317 340 : self.0.delete_on_drop();
318 340 : }
319 :
320 : /// Return data needed to reconstruct given page at LSN.
321 : ///
322 : /// It is up to the caller to collect more data from the previous layer and
323 : /// perform WAL redo, if necessary.
324 : ///
325 : /// # Cancellation-Safety
326 : ///
327 : /// This method is cancellation-safe.
328 210381 : pub(crate) async fn get_value_reconstruct_data(
329 210381 : &self,
330 210381 : key: Key,
331 210381 : lsn_range: Range<Lsn>,
332 210381 : reconstruct_data: &mut ValueReconstructState,
333 210381 : ctx: &RequestContext,
334 210381 : ) -> anyhow::Result<ValueReconstructResult> {
335 : use anyhow::ensure;
336 :
337 210381 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
338 210381 : self.0
339 210381 : .access_stats
340 210381 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
341 210381 :
342 210381 : if self.layer_desc().is_delta {
343 203442 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
344 203442 : ensure!(self.layer_desc().key_range.contains(&key));
345 : } else {
346 6939 : ensure!(self.layer_desc().key_range.contains(&key));
347 6939 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
348 6939 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
349 : }
350 :
351 210381 : layer
352 210381 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
353 210381 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
354 29703 : .await
355 210381 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
356 210381 : }
357 :
358 354 : pub(crate) async fn get_values_reconstruct_data(
359 354 : &self,
360 354 : keyspace: KeySpace,
361 354 : lsn_range: Range<Lsn>,
362 354 : reconstruct_data: &mut ValuesReconstructState,
363 354 : ctx: &RequestContext,
364 354 : ) -> Result<(), GetVectoredError> {
365 354 : let layer = self
366 354 : .0
367 354 : .get_or_maybe_download(true, Some(ctx))
368 0 : .await
369 354 : .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
370 :
371 354 : self.0
372 354 : .access_stats
373 354 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
374 354 :
375 354 : layer
376 354 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
377 354 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
378 11490 : .await
379 354 : .map_err(|err| match err {
380 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
381 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
382 0 : ),
383 0 : err => err,
384 354 : })
385 354 : }
386 :
387 : /// Download the layer if evicted.
388 : ///
389 : /// Will not error when the layer is already downloaded.
390 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
391 0 : self.0.get_or_maybe_download(true, None).await?;
392 0 : Ok(())
393 0 : }
394 :
395 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
396 : /// while the guard exists.
397 : ///
398 : /// Returns None if the layer is currently evicted or becoming evicted.
399 : #[cfg(test)]
400 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
401 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
402 :
403 14 : Some(ResidentLayer {
404 14 : downloaded,
405 14 : owner: self.clone(),
406 14 : })
407 20 : }
408 :
409 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
410 : /// with `EvictionError::NotFound`.
411 : ///
412 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
413 : /// will be unless a read happens soon.
414 44 : pub(crate) fn is_likely_resident(&self) -> bool {
415 44 : self.0
416 44 : .inner
417 44 : .get()
418 44 : .map(|rowe| rowe.is_likely_resident())
419 44 : .unwrap_or(false)
420 44 : }
421 :
422 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
423 334 : pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
424 334 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
425 :
426 334 : Ok(ResidentLayer {
427 334 : downloaded,
428 334 : owner: self.clone(),
429 334 : })
430 334 : }
431 :
432 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
433 0 : self.0.info(reset)
434 0 : }
435 :
436 0 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
437 0 : &self.0.access_stats
438 0 : }
439 :
440 1296 : pub(crate) fn local_path(&self) -> &Utf8Path {
441 1296 : &self.0.path
442 1296 : }
443 :
444 210377 : pub(crate) fn debug_str(&self) -> &Arc<str> {
445 210377 : &self.0.debug_str
446 210377 : }
447 :
448 1290 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
449 1290 : self.0.metadata()
450 1290 : }
451 :
452 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
453 0 : self.0
454 0 : .timeline
455 0 : .upgrade()
456 0 : .map(|timeline| timeline.timeline_id)
457 0 : }
458 :
459 : /// Traditional debug dumping facility
460 : #[allow(unused)]
461 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
462 4 : self.0.desc.dump();
463 4 :
464 4 : if verbose {
465 : // for now, unconditionally download everything, even if that might not be wanted.
466 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
467 8 : l.dump(&self.0, ctx).await?
468 0 : }
469 :
470 4 : Ok(())
471 4 : }
472 :
473 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
474 : /// deletion scheduling has completed).
475 : ///
476 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
477 : /// separatedly.
478 : #[cfg(any(feature = "testing", test))]
479 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
480 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
481 :
482 2 : async move {
483 : loop {
484 6 : if rx.changed().await.is_err() {
485 2 : break;
486 0 : }
487 : }
488 2 : }
489 2 : }
490 : }
491 :
492 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
493 : ///
494 : /// However when we want something evicted, we cannot evict it right away as there might be current
495 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
496 : /// read with [`Layer::get_value_reconstruct_data`].
497 : ///
498 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
499 : #[derive(Debug)]
500 : enum ResidentOrWantedEvicted {
501 : Resident(Arc<DownloadedLayer>),
502 : WantedEvicted(Weak<DownloadedLayer>, usize),
503 : }
504 :
505 : impl ResidentOrWantedEvicted {
506 : /// Non-mutating access to the a DownloadedLayer, if possible.
507 : ///
508 : /// This is not used on the read path (anything that calls
509 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
510 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
511 : #[cfg(test)]
512 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
513 14 : match self {
514 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
515 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
516 : }
517 14 : }
518 :
519 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
520 : /// reference from `ResidentOrWantedEvicted::get`.
521 38 : fn is_likely_resident(&self) -> bool {
522 38 : match self {
523 32 : ResidentOrWantedEvicted::Resident(_) => true,
524 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
525 : }
526 38 : }
527 :
528 : /// Upgrades any weak to strong if possible.
529 : ///
530 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
531 : /// happened.
532 211077 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
533 211077 : match self {
534 211069 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
535 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
536 0 : Some(strong) => {
537 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
538 0 :
539 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
540 0 :
541 0 : Some((strong, true))
542 : }
543 8 : None => None,
544 : },
545 : }
546 211077 : }
547 :
548 : /// When eviction is first requested, drop down to holding a [`Weak`].
549 : ///
550 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
551 : /// drop the possibly last strong reference outside of the mutex of
552 : /// [`heavier_once_cell::OnceCell`].
553 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
554 30 : match self {
555 26 : ResidentOrWantedEvicted::Resident(strong) => {
556 26 : let weak = Arc::downgrade(strong);
557 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
558 26 : std::mem::swap(self, &mut temp);
559 26 : match temp {
560 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
561 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
562 : }
563 : }
564 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
565 : }
566 30 : }
567 : }
568 :
569 : struct LayerInner {
570 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
571 : /// [`Self::path`].
572 : conf: &'static PageServerConf,
573 :
574 : /// Full path to the file; unclear if this should exist anymore.
575 : path: Utf8PathBuf,
576 :
577 : /// String representation of the layer, used for traversal id.
578 : debug_str: Arc<str>,
579 :
580 : desc: PersistentLayerDesc,
581 :
582 : /// Timeline access is needed for remote timeline client and metrics.
583 : ///
584 : /// There should not be an access to timeline for any reason without entering the
585 : /// [`Timeline::gate`] at the same time.
586 : timeline: Weak<Timeline>,
587 :
588 : access_stats: LayerAccessStats,
589 :
590 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
591 : ///
592 : /// Filesystem changes (download, evict) are only done while holding a permit which the
593 : /// `heavier_once_cell` provides.
594 : ///
595 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
596 : /// possibly read while not holding it.
597 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
598 :
599 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
600 : wanted_deleted: AtomicBool,
601 :
602 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
603 : ///
604 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
605 : /// `ResidentOrWantedEvicted::WantedEvicted`.
606 : version: AtomicUsize,
607 :
608 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
609 : /// starts, or completes.
610 : ///
611 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
612 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
613 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
614 : /// [`ResidentOrWantedEvicted::Resident`] on access.
615 : ///
616 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
617 : status: Option<tokio::sync::watch::Sender<Status>>,
618 :
619 : /// Counter for exponential backoff with the download.
620 : ///
621 : /// This is atomic only for the purposes of having additional data only accessed while holding
622 : /// the InitPermit.
623 : consecutive_failures: AtomicUsize,
624 :
625 : /// The generation of this Layer.
626 : ///
627 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
628 : /// for created layers from [`Timeline::generation`].
629 : generation: Generation,
630 :
631 : /// The shard of this Layer.
632 : ///
633 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
634 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
635 : ///
636 : /// For loaded layers, this may be some other value if the tenant has undergone
637 : /// a shard split since the layer was originally written.
638 : shard: ShardIndex,
639 :
640 : /// When the Layer was last evicted but has not been downloaded since.
641 : ///
642 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
643 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
644 :
645 : #[cfg(test)]
646 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
647 : }
648 :
649 : impl std::fmt::Display for LayerInner {
650 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 30 : write!(f, "{}", self.layer_desc().short_id())
652 30 : }
653 : }
654 :
655 : impl AsLayerDesc for LayerInner {
656 715455 : fn layer_desc(&self) -> &PersistentLayerDesc {
657 715455 : &self.desc
658 715455 : }
659 : }
660 :
661 : #[derive(Debug, Clone, Copy)]
662 : enum Status {
663 : Resident,
664 : Evicted,
665 : Downloading,
666 : }
667 :
668 : impl Drop for LayerInner {
669 371 : fn drop(&mut self) {
670 : // if there was a pending eviction, mark it cancelled here to balance metrics
671 371 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
672 2 : {
673 2 : // eviction has already been started
674 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
675 2 :
676 2 : // eviction request is intentionally not honored as no one is present to wait for it
677 2 : // and we could be delaying shutdown for nothing.
678 369 : }
679 :
680 371 : if !*self.wanted_deleted.get_mut() {
681 36 : return;
682 335 : }
683 :
684 335 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
685 :
686 335 : let path = std::mem::take(&mut self.path);
687 335 : let file_name = self.layer_desc().layer_name();
688 335 : let file_size = self.layer_desc().file_size;
689 335 : let timeline = self.timeline.clone();
690 335 : let meta = self.metadata();
691 335 : let status = self.status.take();
692 335 :
693 335 : Self::spawn_blocking(move || {
694 335 : let _g = span.entered();
695 335 :
696 335 : // carry this until we are finished for [`Layer::wait_drop`] support
697 335 : let _status = status;
698 :
699 335 : let Some(timeline) = timeline.upgrade() else {
700 : // no need to nag that timeline is gone: under normal situation on
701 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
702 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
703 0 : return;
704 : };
705 :
706 335 : let Ok(_guard) = timeline.gate.enter() else {
707 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
708 0 : return;
709 : };
710 :
711 335 : let removed = match std::fs::remove_file(path) {
712 333 : Ok(()) => true,
713 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
714 2 : // until we no longer do detaches by removing all local files before removing the
715 2 : // tenant from the global map, we will always get these errors even if we knew what
716 2 : // is the latest state.
717 2 : //
718 2 : // we currently do not track the latest state, so we'll also end up here on evicted
719 2 : // layers.
720 2 : false
721 : }
722 0 : Err(e) => {
723 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
724 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
725 0 : false
726 : }
727 : };
728 :
729 335 : if removed {
730 333 : timeline.metrics.resident_physical_size_sub(file_size);
731 333 : }
732 335 : let res = timeline
733 335 : .remote_client
734 335 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
735 :
736 335 : if let Err(e) = res {
737 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
738 : // demonstrating this deadlock (without spawn_blocking): stop will drop
739 : // queued items, which will have ResidentLayer's, and those drops would try
740 : // to re-entrantly lock the RemoteTimelineClient inner state.
741 0 : if !timeline.is_active() {
742 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
743 : } else {
744 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
745 : }
746 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
747 335 : } else {
748 335 : LAYER_IMPL_METRICS.inc_completed_deletes();
749 335 : }
750 335 : });
751 371 : }
752 : }
753 :
754 : impl LayerInner {
755 : #[allow(clippy::too_many_arguments)]
756 1318 : fn new(
757 1318 : conf: &'static PageServerConf,
758 1318 : timeline: &Arc<Timeline>,
759 1318 : local_path: Utf8PathBuf,
760 1318 : access_stats: LayerAccessStats,
761 1318 : desc: PersistentLayerDesc,
762 1318 : downloaded: Option<Arc<DownloadedLayer>>,
763 1318 : generation: Generation,
764 1318 : shard: ShardIndex,
765 1318 : ) -> Self {
766 1318 : let (inner, version, init_status) = if let Some(inner) = downloaded {
767 1318 : let version = inner.version;
768 1318 : let resident = ResidentOrWantedEvicted::Resident(inner);
769 1318 : (
770 1318 : heavier_once_cell::OnceCell::new(resident),
771 1318 : version,
772 1318 : Status::Resident,
773 1318 : )
774 : } else {
775 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
776 : };
777 :
778 1318 : LayerInner {
779 1318 : conf,
780 1318 : debug_str: {
781 1318 : format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
782 1318 : },
783 1318 : path: local_path,
784 1318 : desc,
785 1318 : timeline: Arc::downgrade(timeline),
786 1318 : access_stats,
787 1318 : wanted_deleted: AtomicBool::new(false),
788 1318 : inner,
789 1318 : version: AtomicUsize::new(version),
790 1318 : status: Some(tokio::sync::watch::channel(init_status).0),
791 1318 : consecutive_failures: AtomicUsize::new(0),
792 1318 : generation,
793 1318 : shard,
794 1318 : last_evicted_at: std::sync::Mutex::default(),
795 1318 : #[cfg(test)]
796 1318 : failpoints: Default::default(),
797 1318 : }
798 1318 : }
799 :
800 340 : fn delete_on_drop(&self) {
801 340 : let res =
802 340 : self.wanted_deleted
803 340 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
804 340 :
805 340 : if res.is_ok() {
806 336 : LAYER_IMPL_METRICS.inc_started_deletes();
807 336 : }
808 340 : }
809 :
810 : /// Cancellation safe, however dropping the future and calling this method again might result
811 : /// in a new attempt to evict OR join the previously started attempt.
812 108 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
813 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
814 : let mut rx = self.status.as_ref().unwrap().subscribe();
815 :
816 : {
817 : let current = rx.borrow_and_update();
818 : match &*current {
819 : Status::Resident => {
820 : // we might get lucky and evict this; continue
821 : }
822 : Status::Evicted | Status::Downloading => {
823 : // it is already evicted
824 : return Err(EvictionError::NotFound);
825 : }
826 : }
827 : }
828 :
829 : let strong = {
830 : match self.inner.get() {
831 : Some(mut either) => either.downgrade(),
832 : None => {
833 : // we already have a scheduled eviction, which just has not gotten to run yet.
834 : // it might still race with a read access, but that could also get cancelled,
835 : // so let's say this is not evictable.
836 : return Err(EvictionError::NotFound);
837 : }
838 : }
839 : };
840 :
841 : if strong.is_some() {
842 : // drop the DownloadedLayer outside of the holding the guard
843 : drop(strong);
844 :
845 : // idea here is that only one evicter should ever get to witness a strong reference,
846 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
847 : // cancelled eviction and signal us, like it currently does.
848 : //
849 : // a second concurrent evict_and_wait will not see a strong reference.
850 : LAYER_IMPL_METRICS.inc_started_evictions();
851 : }
852 :
853 : let changed = rx.changed();
854 : let changed = tokio::time::timeout(timeout, changed).await;
855 :
856 : let Ok(changed) = changed else {
857 : return Err(EvictionError::Timeout);
858 : };
859 :
860 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
861 :
862 : let current = rx.borrow_and_update();
863 :
864 : match &*current {
865 : // the easiest case
866 : Status::Evicted => Ok(()),
867 : // it surely was evicted in between, but then there was a new access now; we can't know
868 : // if it'll succeed so lets just call it evicted
869 : Status::Downloading => Ok(()),
870 : // either the download which was started after eviction completed already, or it was
871 : // never evicted
872 : Status::Resident => Err(EvictionError::Downloaded),
873 : }
874 : }
875 :
876 : /// Cancellation safe.
877 211085 : async fn get_or_maybe_download(
878 211085 : self: &Arc<Self>,
879 211085 : allow_download: bool,
880 211085 : ctx: Option<&RequestContext>,
881 211085 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
882 16 : let (weak, permit) = {
883 : // get_or_init_detached can:
884 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
885 : // - be slow (wait for semaphore permit or closing)
886 211085 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
887 :
888 211085 : let locked = self
889 211085 : .inner
890 211085 : .get_or_init_detached()
891 3 : .await
892 211085 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
893 211085 :
894 211085 : scopeguard::ScopeGuard::into_inner(init_cancelled);
895 :
896 211069 : match locked {
897 : // this path could had been a RwLock::read
898 211069 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
899 0 : Ok(Ok((strong, _))) => {
900 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
901 0 : // previously a `evict_and_wait` was received. this is the only place when we
902 0 : // send out an update without holding the InitPermit.
903 0 : //
904 0 : // note that we also have dropped the Guard; this is fine, because we just made
905 0 : // a state change and are holding a strong reference to be returned.
906 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
907 0 : LAYER_IMPL_METRICS
908 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
909 0 :
910 0 : return Ok(strong);
911 : }
912 8 : Ok(Err(guard)) => {
913 8 : // path to here: we won the eviction, the file should still be on the disk.
914 8 : let (weak, permit) = guard.take_and_deinit();
915 8 : (Some(weak), permit)
916 : }
917 8 : Err(permit) => (None, permit),
918 : }
919 : };
920 :
921 16 : if let Some(weak) = weak {
922 : // only drop the weak after dropping the heavier_once_cell guard
923 8 : assert!(
924 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
925 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
926 : );
927 8 : }
928 :
929 16 : let timeline = self
930 16 : .timeline
931 16 : .upgrade()
932 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
933 :
934 : // count cancellations, which currently remain largely unexpected
935 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
936 :
937 : // check if we really need to be downloaded: this can happen if a read access won the
938 : // semaphore before eviction.
939 : //
940 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
941 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
942 16 : let needs_download = self
943 16 : .needs_download()
944 13 : .await
945 16 : .map_err(DownloadError::PreStatFailed);
946 16 :
947 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
948 :
949 16 : let needs_download = needs_download?;
950 :
951 16 : let Some(reason) = needs_download else {
952 : // the file is present locally because eviction has not had a chance to run yet
953 :
954 : #[cfg(test)]
955 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
956 2 : .await?;
957 :
958 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
959 6 :
960 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
961 : };
962 :
963 : // we must download; getting cancelled before spawning the download is not an issue as
964 : // any still running eviction would not find anything to evict.
965 :
966 8 : if let NeedsDownload::NotFile(ft) = reason {
967 0 : return Err(DownloadError::NotFile(ft));
968 8 : }
969 :
970 8 : if let Some(ctx) = ctx {
971 2 : self.check_expected_download(ctx)?;
972 6 : }
973 :
974 8 : if !allow_download {
975 : // this is only used from tests, but it is hard to test without the boolean
976 2 : return Err(DownloadError::DownloadRequired);
977 6 : }
978 6 :
979 6 : let download_ctx = ctx
980 6 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
981 6 : .unwrap_or(RequestContext::new(
982 6 : TaskKind::LayerDownload,
983 6 : DownloadBehavior::Download,
984 6 : ));
985 :
986 6 : async move {
987 6 : tracing::info!(%reason, "downloading on-demand");
988 :
989 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
990 6 : let res = self
991 6 : .download_init_and_wait(timeline, permit, download_ctx)
992 10 : .await?;
993 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
994 6 : Ok(res)
995 6 : }
996 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
997 10 : .await
998 211085 : }
999 :
1000 : /// Nag or fail per RequestContext policy
1001 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1002 2 : use crate::context::DownloadBehavior::*;
1003 2 : let b = ctx.download_behavior();
1004 2 : match b {
1005 2 : Download => Ok(()),
1006 : Warn | Error => {
1007 0 : tracing::info!(
1008 0 : "unexpectedly on-demand downloading for task kind {:?}",
1009 0 : ctx.task_kind()
1010 : );
1011 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1012 :
1013 0 : let really_error =
1014 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1015 :
1016 0 : if really_error {
1017 : // this check is only probablistic, seems like flakyness footgun
1018 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1019 : } else {
1020 0 : Ok(())
1021 : }
1022 : }
1023 : }
1024 2 : }
1025 :
1026 : /// Actual download, at most one is executed at the time.
1027 6 : async fn download_init_and_wait(
1028 6 : self: &Arc<Self>,
1029 6 : timeline: Arc<Timeline>,
1030 6 : permit: heavier_once_cell::InitPermit,
1031 6 : ctx: RequestContext,
1032 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1033 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1034 6 :
1035 6 : let (tx, rx) = tokio::sync::oneshot::channel();
1036 6 :
1037 6 : let this: Arc<Self> = self.clone();
1038 :
1039 6 : let guard = timeline
1040 6 : .gate
1041 6 : .enter()
1042 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
1043 :
1044 6 : Self::spawn(
1045 6 : async move {
1046 0 : let _guard = guard;
1047 0 :
1048 0 : // now that we have commited to downloading, send out an update to:
1049 0 : // - unhang any pending eviction
1050 0 : // - break out of evict_and_wait
1051 0 : this.status
1052 0 : .as_ref()
1053 0 : .unwrap()
1054 0 : .send_replace(Status::Downloading);
1055 6 :
1056 6 : #[cfg(test)]
1057 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1058 2 : .await
1059 6 : .unwrap();
1060 :
1061 93 : let res = this.download_and_init(timeline, permit, &ctx).await;
1062 :
1063 6 : if let Err(res) = tx.send(res) {
1064 0 : match res {
1065 0 : Ok(_res) => {
1066 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1067 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1068 : }
1069 0 : Err(e) => {
1070 0 : tracing::info!(
1071 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1072 : );
1073 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1074 : }
1075 : }
1076 6 : }
1077 6 : }
1078 6 : .in_current_span(),
1079 6 : );
1080 6 :
1081 10 : match rx.await {
1082 6 : Ok(Ok(res)) => Ok(res),
1083 0 : Ok(Err(e)) => {
1084 0 : // sleep already happened in the spawned task, if it was not cancelled
1085 0 : match e.downcast_ref::<remote_storage::DownloadError>() {
1086 : // If the download failed due to its cancellation token,
1087 : // propagate the cancellation error upstream.
1088 : Some(remote_storage::DownloadError::Cancelled) => {
1089 0 : Err(DownloadError::DownloadCancelled)
1090 : }
1091 : // FIXME: this is not embedding the error because historically it would had
1092 : // been output to compute, however that is no longer the case.
1093 0 : _ => Err(DownloadError::DownloadFailed),
1094 : }
1095 : }
1096 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1097 : }
1098 6 : }
1099 :
1100 6 : async fn download_and_init(
1101 6 : self: &Arc<LayerInner>,
1102 6 : timeline: Arc<Timeline>,
1103 6 : permit: heavier_once_cell::InitPermit,
1104 6 : ctx: &RequestContext,
1105 6 : ) -> anyhow::Result<Arc<DownloadedLayer>> {
1106 6 : let result = timeline
1107 6 : .remote_client
1108 6 : .download_layer_file(
1109 6 : &self.desc.layer_name(),
1110 6 : &self.metadata(),
1111 6 : &self.path,
1112 6 : &timeline.cancel,
1113 6 : ctx,
1114 6 : )
1115 87 : .await;
1116 :
1117 6 : match result {
1118 6 : Ok(size) => {
1119 6 : assert_eq!(size, self.desc.file_size);
1120 :
1121 6 : match self.needs_download().await {
1122 0 : Ok(Some(reason)) => {
1123 0 : // this is really a bug in needs_download or remote timeline client
1124 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1125 : }
1126 6 : Ok(None) => {
1127 6 : // as expected
1128 6 : }
1129 0 : Err(e) => {
1130 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1131 : }
1132 : }
1133 :
1134 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1135 6 : timeline
1136 6 : .metrics
1137 6 : .resident_physical_size_add(self.desc.file_size);
1138 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1139 6 :
1140 6 : let since_last_eviction = self
1141 6 : .last_evicted_at
1142 6 : .lock()
1143 6 : .unwrap()
1144 6 : .take()
1145 6 : .map(|ts| ts.elapsed());
1146 6 : if let Some(since_last_eviction) = since_last_eviction {
1147 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1148 6 : }
1149 :
1150 6 : self.access_stats.record_residence_event(
1151 6 : LayerResidenceStatus::Resident,
1152 6 : LayerResidenceEventReason::ResidenceChange,
1153 6 : );
1154 6 :
1155 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1156 : }
1157 0 : Err(e) => {
1158 0 : let consecutive_failures =
1159 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1160 0 :
1161 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1162 :
1163 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1164 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1165 0 : 1.5,
1166 0 : 60.0,
1167 0 : );
1168 0 :
1169 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1170 :
1171 : tokio::select! {
1172 : _ = tokio::time::sleep(backoff) => {},
1173 : _ = timeline.cancel.cancelled() => {},
1174 : };
1175 :
1176 0 : Err(e)
1177 : }
1178 : }
1179 6 : }
1180 :
1181 : /// Initializes the `Self::inner` to a "resident" state.
1182 : ///
1183 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1184 : /// before calling this method.
1185 : ///
1186 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1187 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1188 12 : fn initialize_after_layer_is_on_disk(
1189 12 : self: &Arc<LayerInner>,
1190 12 : permit: heavier_once_cell::InitPermit,
1191 12 : ) -> Arc<DownloadedLayer> {
1192 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1193 12 :
1194 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1195 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1196 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1197 12 :
1198 12 : let res = Arc::new(DownloadedLayer {
1199 12 : owner: Arc::downgrade(self),
1200 12 : kind: tokio::sync::OnceCell::default(),
1201 12 : version: next_version,
1202 12 : });
1203 12 :
1204 12 : let waiters = self.inner.initializer_count();
1205 12 : if waiters > 0 {
1206 0 : tracing::info!(waiters, "completing layer init for other tasks");
1207 12 : }
1208 :
1209 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1210 12 :
1211 12 : self.inner.set(value, permit);
1212 12 :
1213 12 : res
1214 12 : }
1215 :
1216 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1217 24 : match tokio::fs::metadata(&self.path).await {
1218 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1219 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1220 0 : Err(e) => Err(e),
1221 : }
1222 24 : }
1223 :
1224 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1225 24 : match self.path.metadata() {
1226 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1227 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1228 0 : Err(e) => Err(e),
1229 : }
1230 24 : }
1231 :
1232 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1233 40 : // in future, this should include sha2-256 validation of the file.
1234 40 : if !m.is_file() {
1235 0 : Err(NeedsDownload::NotFile(m.file_type()))
1236 40 : } else if m.len() != self.desc.file_size {
1237 0 : Err(NeedsDownload::WrongSize {
1238 0 : actual: m.len(),
1239 0 : expected: self.desc.file_size,
1240 0 : })
1241 : } else {
1242 40 : Ok(())
1243 : }
1244 40 : }
1245 :
1246 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1247 0 : let layer_name = self.desc.layer_name().to_string();
1248 0 :
1249 0 : let resident = self
1250 0 : .inner
1251 0 : .get()
1252 0 : .map(|rowe| rowe.is_likely_resident())
1253 0 : .unwrap_or(false);
1254 0 :
1255 0 : let access_stats = self.access_stats.as_api_model(reset);
1256 0 :
1257 0 : if self.desc.is_delta {
1258 0 : let lsn_range = &self.desc.lsn_range;
1259 0 :
1260 0 : HistoricLayerInfo::Delta {
1261 0 : layer_file_name: layer_name,
1262 0 : layer_file_size: self.desc.file_size,
1263 0 : lsn_start: lsn_range.start,
1264 0 : lsn_end: lsn_range.end,
1265 0 : remote: !resident,
1266 0 : access_stats,
1267 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(self.layer_desc()),
1268 0 : }
1269 : } else {
1270 0 : let lsn = self.desc.image_layer_lsn();
1271 0 :
1272 0 : HistoricLayerInfo::Image {
1273 0 : layer_file_name: layer_name,
1274 0 : layer_file_size: self.desc.file_size,
1275 0 : lsn_start: lsn,
1276 0 : remote: !resident,
1277 0 : access_stats,
1278 0 : }
1279 : }
1280 0 : }
1281 :
1282 : /// `DownloadedLayer` is being dropped, so it calls this method.
1283 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1284 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1285 : // though here it is very likely
1286 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1287 :
1288 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1289 : // drop while the `self.inner` is being locked, leading to a deadlock.
1290 :
1291 24 : let start_evicting = async move {
1292 24 : #[cfg(test)]
1293 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1294 14 : .await
1295 24 : .expect("failpoint should not have errored");
1296 24 :
1297 24 : tracing::debug!("eviction started");
1298 :
1299 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1300 : // metrics: ignore the Ok branch, it is not done yet
1301 24 : if let Err(e) = res {
1302 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1303 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1304 18 : }
1305 24 : };
1306 :
1307 24 : Self::spawn(start_evicting.instrument(span));
1308 24 : }
1309 :
1310 24 : async fn wait_for_turn_and_evict(
1311 24 : self: Arc<LayerInner>,
1312 24 : only_version: usize,
1313 24 : ) -> Result<(), EvictionCancelled> {
1314 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1315 46 : use Status::*;
1316 46 : match status {
1317 44 : Resident => Ok(()),
1318 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1319 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1320 : }
1321 46 : }
1322 :
1323 24 : let timeline = self
1324 24 : .timeline
1325 24 : .upgrade()
1326 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1327 :
1328 24 : let mut rx = self
1329 24 : .status
1330 24 : .as_ref()
1331 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1332 24 : .subscribe();
1333 24 :
1334 24 : is_good_to_continue(&rx.borrow_and_update())?;
1335 :
1336 22 : let Ok(_gate) = timeline.gate.enter() else {
1337 0 : return Err(EvictionCancelled::TimelineGone);
1338 : };
1339 :
1340 18 : let permit = {
1341 : // we cannot just `std::fs::remove_file` because there might already be an
1342 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1343 : // operations must be done while holding the heavier_once_cell::InitPermit
1344 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1345 :
1346 22 : let waited = loop {
1347 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1348 22 : // completion of the download. waiting for download could be long and hinder our
1349 22 : // efforts to alert on "hanging" evictions.
1350 22 : tokio::select! {
1351 : res = &mut wait => break res,
1352 : _ = rx.changed() => {
1353 : is_good_to_continue(&rx.borrow_and_update())?;
1354 : // two possibilities for Status::Resident:
1355 : // - the layer was found locally from disk by a read
1356 : // - we missed a bunch of updates and now the layer is
1357 : // again downloaded -- assume we'll fail later on with
1358 : // version check or AlreadyReinitialized
1359 : }
1360 22 : }
1361 22 : };
1362 :
1363 : // re-check now that we have the guard or permit; all updates should have happened
1364 : // while holding the permit.
1365 22 : is_good_to_continue(&rx.borrow_and_update())?;
1366 :
1367 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1368 : // lead to deallocating the reference counted value, and the value we
1369 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1370 22 : let (_weak, permit) = match waited {
1371 20 : Ok(guard) => {
1372 20 : match &*guard {
1373 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1374 18 : if *version == only_version =>
1375 16 : {
1376 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1377 16 : let (weak, permit) = guard.take_and_deinit();
1378 16 : (Some(weak), permit)
1379 : }
1380 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1381 2 : // if we were not doing the version check, we would need to try to
1382 2 : // upgrade the weak here to see if it really is dropped. version check
1383 2 : // is done instead assuming that it is cheaper.
1384 2 : tracing::debug!(
1385 : version,
1386 : only_version,
1387 0 : "version mismatch, not deinitializing"
1388 : );
1389 2 : return Err(EvictionCancelled::VersionCheckFailed);
1390 : }
1391 : ResidentOrWantedEvicted::Resident(_) => {
1392 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1393 : }
1394 : }
1395 : }
1396 2 : Err(permit) => {
1397 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1398 2 : (None, permit)
1399 : }
1400 : };
1401 :
1402 18 : permit
1403 18 : };
1404 18 :
1405 18 : let span = tracing::Span::current();
1406 18 :
1407 18 : let spawned_at = std::time::Instant::now();
1408 18 :
1409 18 : // this is on purpose a detached spawn; we don't need to wait for it
1410 18 : //
1411 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1412 18 : // well from a spawn_blocking thread.
1413 18 : //
1414 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1415 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1416 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1417 18 : // evicting.
1418 18 : //
1419 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1420 18 : // reads. We will need to add cancellation for that if necessary.
1421 18 : Self::spawn_blocking(move || {
1422 18 : let _span = span.entered();
1423 18 :
1424 18 : let res = self.evict_blocking(&timeline, &permit);
1425 18 :
1426 18 : let waiters = self.inner.initializer_count();
1427 18 :
1428 18 : if waiters > 0 {
1429 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1430 18 : }
1431 :
1432 18 : let completed_in = spawned_at.elapsed();
1433 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1434 18 :
1435 18 : match res {
1436 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1437 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1438 : }
1439 :
1440 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1441 18 : });
1442 18 :
1443 18 : Ok(())
1444 24 : }
1445 :
1446 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1447 18 : fn evict_blocking(
1448 18 : &self,
1449 18 : timeline: &Timeline,
1450 18 : _permit: &heavier_once_cell::InitPermit,
1451 18 : ) -> Result<(), EvictionCancelled> {
1452 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1453 18 :
1454 18 : match capture_mtime_and_remove(&self.path) {
1455 18 : Ok(local_layer_mtime) => {
1456 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1457 18 : match duration {
1458 18 : Ok(elapsed) => {
1459 18 : timeline
1460 18 : .metrics
1461 18 : .evictions_with_low_residence_duration
1462 18 : .read()
1463 18 : .unwrap()
1464 18 : .observe(elapsed);
1465 18 : tracing::info!(
1466 0 : residence_millis = elapsed.as_millis(),
1467 0 : "evicted layer after known residence period"
1468 : );
1469 : }
1470 : Err(_) => {
1471 0 : tracing::info!("evicted layer after unknown residence period");
1472 : }
1473 : }
1474 18 : timeline.metrics.evictions.inc();
1475 18 : timeline
1476 18 : .metrics
1477 18 : .resident_physical_size_sub(self.desc.file_size);
1478 : }
1479 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1480 0 : tracing::error!(
1481 : layer_size = %self.desc.file_size,
1482 0 : "failed to evict layer from disk, it was already gone"
1483 : );
1484 0 : return Err(EvictionCancelled::FileNotFound);
1485 : }
1486 0 : Err(e) => {
1487 0 : // FIXME: this should probably be an abort
1488 0 : tracing::error!("failed to evict file from disk: {e:#}");
1489 0 : return Err(EvictionCancelled::RemoveFailed);
1490 : }
1491 : }
1492 :
1493 18 : self.access_stats.record_residence_event(
1494 18 : LayerResidenceStatus::Evicted,
1495 18 : LayerResidenceEventReason::ResidenceChange,
1496 18 : );
1497 18 :
1498 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1499 18 :
1500 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1501 18 :
1502 18 : Ok(())
1503 18 : }
1504 :
1505 1631 : fn metadata(&self) -> LayerFileMetadata {
1506 1631 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1507 1631 : }
1508 :
1509 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1510 : ///
1511 : /// Synchronizing with spawned tasks is very complicated otherwise.
1512 30 : fn spawn<F>(fut: F)
1513 30 : where
1514 30 : F: std::future::Future<Output = ()> + Send + 'static,
1515 30 : {
1516 30 : #[cfg(test)]
1517 30 : tokio::task::spawn(fut);
1518 30 : #[cfg(not(test))]
1519 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1520 30 : }
1521 :
1522 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1523 353 : fn spawn_blocking<F>(f: F)
1524 353 : where
1525 353 : F: FnOnce() + Send + 'static,
1526 353 : {
1527 353 : #[cfg(test)]
1528 353 : tokio::task::spawn_blocking(f);
1529 353 : #[cfg(not(test))]
1530 353 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1531 353 : }
1532 : }
1533 :
1534 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1535 18 : let m = path.metadata()?;
1536 18 : let local_layer_mtime = m.modified()?;
1537 18 : std::fs::remove_file(path)?;
1538 18 : Ok(local_layer_mtime)
1539 18 : }
1540 :
1541 0 : #[derive(Debug, thiserror::Error)]
1542 : pub(crate) enum EvictionError {
1543 : #[error("layer was already evicted")]
1544 : NotFound,
1545 :
1546 : /// Evictions must always lose to downloads in races, and this time it happened.
1547 : #[error("layer was downloaded instead")]
1548 : Downloaded,
1549 :
1550 : #[error("eviction did not happen within timeout")]
1551 : Timeout,
1552 : }
1553 :
1554 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1555 0 : #[derive(Debug, thiserror::Error)]
1556 : pub(crate) enum DownloadError {
1557 : #[error("timeline has already shutdown")]
1558 : TimelineShutdown,
1559 : #[error("context denies downloading")]
1560 : ContextAndConfigReallyDeniesDownloads,
1561 : #[error("downloading is really required but not allowed by this method")]
1562 : DownloadRequired,
1563 : #[error("layer path exists, but it is not a file: {0:?}")]
1564 : NotFile(std::fs::FileType),
1565 : /// Why no error here? Because it will be reported by page_service. We should had also done
1566 : /// retries already.
1567 : #[error("downloading evicted layer file failed")]
1568 : DownloadFailed,
1569 : #[error("downloading failed, possibly for shutdown")]
1570 : DownloadCancelled,
1571 : #[error("pre-condition: stat before download failed")]
1572 : PreStatFailed(#[source] std::io::Error),
1573 :
1574 : #[cfg(test)]
1575 : #[error("failpoint: {0:?}")]
1576 : Failpoint(failpoints::FailpointKind),
1577 : }
1578 :
1579 : #[derive(Debug, PartialEq)]
1580 : pub(crate) enum NeedsDownload {
1581 : NotFound,
1582 : NotFile(std::fs::FileType),
1583 : WrongSize { actual: u64, expected: u64 },
1584 : }
1585 :
1586 : impl std::fmt::Display for NeedsDownload {
1587 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1588 6 : match self {
1589 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1590 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1591 0 : NeedsDownload::WrongSize { actual, expected } => {
1592 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1593 : }
1594 : }
1595 6 : }
1596 : }
1597 :
1598 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1599 : pub(crate) struct DownloadedLayer {
1600 : owner: Weak<LayerInner>,
1601 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1602 : // DownloadedLayer
1603 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1604 : version: usize,
1605 : }
1606 :
1607 : impl std::fmt::Debug for DownloadedLayer {
1608 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1609 0 : f.debug_struct("DownloadedLayer")
1610 0 : // owner omitted because it is always "Weak"
1611 0 : .field("kind", &self.kind)
1612 0 : .field("version", &self.version)
1613 0 : .finish()
1614 0 : }
1615 : }
1616 :
1617 : impl Drop for DownloadedLayer {
1618 393 : fn drop(&mut self) {
1619 393 : if let Some(owner) = self.owner.upgrade() {
1620 24 : owner.on_downloaded_layer_drop(self.version);
1621 369 : } else {
1622 369 : // Layer::drop will handle cancelling the eviction; because of drop order and
1623 369 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1624 369 : }
1625 393 : }
1626 : }
1627 :
1628 : impl DownloadedLayer {
1629 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
1630 : /// initialize it permanently.
1631 : ///
1632 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1633 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1634 : /// we will always have the LayerInner on the callstack, so we can just use it.
1635 211103 : async fn get<'a>(
1636 211103 : &'a self,
1637 211103 : owner: &Arc<LayerInner>,
1638 211103 : ctx: &RequestContext,
1639 211103 : ) -> anyhow::Result<&'a LayerKind> {
1640 211103 : let init = || async {
1641 956 : assert_eq!(
1642 956 : Weak::as_ptr(&self.owner),
1643 956 : Arc::as_ptr(owner),
1644 956 : "these are the same, just avoiding the upgrade"
1645 956 : );
1646 956 :
1647 956 : let res = if owner.desc.is_delta {
1648 956 : let summary = Some(delta_layer::Summary::expected(
1649 894 : owner.desc.tenant_shard_id.tenant_id,
1650 894 : owner.desc.timeline_id,
1651 894 : owner.desc.key_range.clone(),
1652 894 : owner.desc.lsn_range.clone(),
1653 894 : ));
1654 894 : delta_layer::DeltaLayerInner::load(
1655 894 : &owner.path,
1656 894 : summary,
1657 894 : Some(owner.conf.max_vectored_read_bytes),
1658 894 : ctx,
1659 894 : )
1660 956 : .await
1661 956 : .map(|res| res.map(LayerKind::Delta))
1662 956 : } else {
1663 956 : let lsn = owner.desc.image_layer_lsn();
1664 62 : let summary = Some(image_layer::Summary::expected(
1665 62 : owner.desc.tenant_shard_id.tenant_id,
1666 62 : owner.desc.timeline_id,
1667 62 : owner.desc.key_range.clone(),
1668 62 : lsn,
1669 62 : ));
1670 62 : image_layer::ImageLayerInner::load(
1671 62 : &owner.path,
1672 62 : lsn,
1673 62 : summary,
1674 62 : Some(owner.conf.max_vectored_read_bytes),
1675 62 : ctx,
1676 62 : )
1677 956 : .await
1678 956 : .map(|res| res.map(LayerKind::Image))
1679 956 : };
1680 956 :
1681 956 : match res {
1682 956 : Ok(Ok(layer)) => Ok(Ok(layer)),
1683 956 : Ok(Err(transient)) => Err(transient),
1684 956 : Err(permanent) => {
1685 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1686 0 : // TODO(#5815): we are not logging all errors, so temporarily log them **once**
1687 0 : // here as well
1688 0 : let permanent = permanent.context("load layer");
1689 0 : tracing::error!("layer loading failed permanently: {permanent:#}");
1690 956 : Ok(Err(permanent))
1691 956 : }
1692 956 : }
1693 956 : };
1694 211103 : self.kind
1695 211103 : .get_or_try_init(init)
1696 : // return transient errors using `?`
1697 966 : .await?
1698 211103 : .as_ref()
1699 211103 : .map_err(|e| {
1700 0 : // errors are not clonabled, cannot but stringify
1701 0 : // test_broken_timeline matches this string
1702 0 : anyhow::anyhow!("layer loading failed: {e:#}")
1703 211103 : })
1704 211103 : }
1705 :
1706 210381 : async fn get_value_reconstruct_data(
1707 210381 : &self,
1708 210381 : key: Key,
1709 210381 : lsn_range: Range<Lsn>,
1710 210381 : reconstruct_data: &mut ValueReconstructState,
1711 210381 : owner: &Arc<LayerInner>,
1712 210381 : ctx: &RequestContext,
1713 210381 : ) -> anyhow::Result<ValueReconstructResult> {
1714 210381 : use LayerKind::*;
1715 210381 :
1716 210381 : match self.get(owner, ctx).await? {
1717 203442 : Delta(d) => {
1718 203442 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1719 28353 : .await
1720 : }
1721 6939 : Image(i) => {
1722 6939 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1723 711 : .await
1724 : }
1725 : }
1726 210381 : }
1727 :
1728 354 : async fn get_values_reconstruct_data(
1729 354 : &self,
1730 354 : keyspace: KeySpace,
1731 354 : lsn_range: Range<Lsn>,
1732 354 : reconstruct_data: &mut ValuesReconstructState,
1733 354 : owner: &Arc<LayerInner>,
1734 354 : ctx: &RequestContext,
1735 354 : ) -> Result<(), GetVectoredError> {
1736 354 : use LayerKind::*;
1737 354 :
1738 354 : match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
1739 252 : Delta(d) => {
1740 252 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1741 10227 : .await
1742 : }
1743 102 : Image(i) => {
1744 102 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1745 1198 : .await
1746 : }
1747 : }
1748 354 : }
1749 :
1750 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1751 4 : use LayerKind::*;
1752 4 : match self.get(owner, ctx).await? {
1753 4 : Delta(d) => d.dump(ctx).await?,
1754 0 : Image(i) => i.dump(ctx).await?,
1755 : }
1756 :
1757 4 : Ok(())
1758 4 : }
1759 : }
1760 :
1761 : /// Wrapper around an actual layer implementation.
1762 : #[derive(Debug)]
1763 : enum LayerKind {
1764 : Delta(delta_layer::DeltaLayerInner),
1765 : Image(image_layer::ImageLayerInner),
1766 : }
1767 :
1768 : /// Guard for forcing a layer be resident while it exists.
1769 : #[derive(Clone)]
1770 : pub(crate) struct ResidentLayer {
1771 : owner: Layer,
1772 : downloaded: Arc<DownloadedLayer>,
1773 : }
1774 :
1775 : impl std::fmt::Display for ResidentLayer {
1776 1612 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1777 1612 : write!(f, "{}", self.owner)
1778 1612 : }
1779 : }
1780 :
1781 : impl std::fmt::Debug for ResidentLayer {
1782 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1783 0 : write!(f, "{}", self.owner)
1784 0 : }
1785 : }
1786 :
1787 : impl ResidentLayer {
1788 : /// Release the eviction guard, converting back into a plain [`Layer`].
1789 : ///
1790 : /// You can access the [`Layer`] also by using `as_ref`.
1791 340 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1792 340 : self.into()
1793 340 : }
1794 :
1795 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1796 644 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1797 : pub(crate) async fn load_keys<'a>(
1798 : &'a self,
1799 : ctx: &RequestContext,
1800 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1801 : use LayerKind::*;
1802 :
1803 : let owner = &self.owner.0;
1804 :
1805 : match self.downloaded.get(owner, ctx).await? {
1806 : Delta(ref d) => {
1807 : owner
1808 : .access_stats
1809 : .record_access(LayerAccessKind::KeyIter, ctx);
1810 :
1811 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1812 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1813 : // while it's being held.
1814 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1815 : .await
1816 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1817 : }
1818 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1819 : }
1820 : }
1821 :
1822 : /// Returns the amount of keys and values written to the writer.
1823 10 : pub(crate) async fn copy_delta_prefix(
1824 10 : &self,
1825 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1826 10 : until: Lsn,
1827 10 : ctx: &RequestContext,
1828 10 : ) -> anyhow::Result<usize> {
1829 10 : use LayerKind::*;
1830 10 :
1831 10 : let owner = &self.owner.0;
1832 10 :
1833 10 : match self.downloaded.get(owner, ctx).await? {
1834 10 : Delta(ref d) => d
1835 10 : .copy_prefix(writer, until, ctx)
1836 13 : .await
1837 10 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1838 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1839 : }
1840 10 : }
1841 :
1842 1184 : pub(crate) fn local_path(&self) -> &Utf8Path {
1843 1184 : &self.owner.0.path
1844 1184 : }
1845 :
1846 1290 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1847 1290 : self.owner.metadata()
1848 1290 : }
1849 :
1850 : #[cfg(test)]
1851 32 : pub(crate) async fn as_delta(
1852 32 : &self,
1853 32 : ctx: &RequestContext,
1854 32 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1855 32 : use LayerKind::*;
1856 32 : match self.downloaded.get(&self.owner.0, ctx).await? {
1857 32 : Delta(ref d) => Ok(d),
1858 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1859 : }
1860 32 : }
1861 : }
1862 :
1863 : impl AsLayerDesc for ResidentLayer {
1864 4510 : fn layer_desc(&self) -> &PersistentLayerDesc {
1865 4510 : self.owner.layer_desc()
1866 4510 : }
1867 : }
1868 :
1869 : impl AsRef<Layer> for ResidentLayer {
1870 1510 : fn as_ref(&self) -> &Layer {
1871 1510 : &self.owner
1872 1510 : }
1873 : }
1874 :
1875 : /// Drop the eviction guard.
1876 : impl From<ResidentLayer> for Layer {
1877 340 : fn from(value: ResidentLayer) -> Self {
1878 340 : value.owner
1879 340 : }
1880 : }
1881 :
1882 : use metrics::IntCounter;
1883 :
1884 : pub(crate) struct LayerImplMetrics {
1885 : started_evictions: IntCounter,
1886 : completed_evictions: IntCounter,
1887 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1888 :
1889 : started_deletes: IntCounter,
1890 : completed_deletes: IntCounter,
1891 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1892 :
1893 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1894 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1895 : redownload_after: metrics::Histogram,
1896 : time_to_evict: metrics::Histogram,
1897 : }
1898 :
1899 : impl Default for LayerImplMetrics {
1900 30 : fn default() -> Self {
1901 30 : use enum_map::Enum;
1902 30 :
1903 30 : // reminder: these will be pageserver_layer_* with "_total" suffix
1904 30 :
1905 30 : let started_evictions = metrics::register_int_counter!(
1906 : "pageserver_layer_started_evictions",
1907 : "Evictions started in the Layer implementation"
1908 : )
1909 30 : .unwrap();
1910 30 : let completed_evictions = metrics::register_int_counter!(
1911 : "pageserver_layer_completed_evictions",
1912 : "Evictions completed in the Layer implementation"
1913 : )
1914 30 : .unwrap();
1915 30 :
1916 30 : let cancelled_evictions = metrics::register_int_counter_vec!(
1917 : "pageserver_layer_cancelled_evictions_count",
1918 : "Different reasons for evictions to have been cancelled or failed",
1919 : &["reason"]
1920 : )
1921 30 : .unwrap();
1922 30 :
1923 270 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1924 270 : let reason = EvictionCancelled::from_usize(i);
1925 270 : let s = reason.as_str();
1926 270 : cancelled_evictions.with_label_values(&[s])
1927 270 : }));
1928 30 :
1929 30 : let started_deletes = metrics::register_int_counter!(
1930 : "pageserver_layer_started_deletes",
1931 : "Deletions on drop pending in the Layer implementation"
1932 : )
1933 30 : .unwrap();
1934 30 : let completed_deletes = metrics::register_int_counter!(
1935 : "pageserver_layer_completed_deletes",
1936 : "Deletions on drop completed in the Layer implementation"
1937 : )
1938 30 : .unwrap();
1939 30 :
1940 30 : let failed_deletes = metrics::register_int_counter_vec!(
1941 : "pageserver_layer_failed_deletes_count",
1942 : "Different reasons for deletions on drop to have failed",
1943 : &["reason"]
1944 : )
1945 30 : .unwrap();
1946 30 :
1947 60 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1948 60 : let reason = DeleteFailed::from_usize(i);
1949 60 : let s = reason.as_str();
1950 60 : failed_deletes.with_label_values(&[s])
1951 60 : }));
1952 30 :
1953 30 : let rare_counters = metrics::register_int_counter_vec!(
1954 : "pageserver_layer_assumed_rare_count",
1955 : "Times unexpected or assumed rare event happened",
1956 : &["event"]
1957 : )
1958 30 : .unwrap();
1959 30 :
1960 210 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
1961 210 : let event = RareEvent::from_usize(i);
1962 210 : let s = event.as_str();
1963 210 : rare_counters.with_label_values(&[s])
1964 210 : }));
1965 30 :
1966 30 : let inits_cancelled = metrics::register_int_counter!(
1967 : "pageserver_layer_inits_cancelled_count",
1968 : "Times Layer initialization was cancelled",
1969 : )
1970 30 : .unwrap();
1971 30 :
1972 30 : let redownload_after = {
1973 30 : let minute = 60.0;
1974 30 : let hour = 60.0 * minute;
1975 : metrics::register_histogram!(
1976 : "pageserver_layer_redownloaded_after",
1977 : "Time between evicting and re-downloading.",
1978 : vec![
1979 : 10.0,
1980 : 30.0,
1981 : minute,
1982 : 5.0 * minute,
1983 : 15.0 * minute,
1984 : 30.0 * minute,
1985 : hour,
1986 : 12.0 * hour,
1987 : ]
1988 : )
1989 30 : .unwrap()
1990 30 : };
1991 30 :
1992 30 : let time_to_evict = metrics::register_histogram!(
1993 : "pageserver_layer_eviction_held_permit_seconds",
1994 : "Time eviction held the permit.",
1995 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
1996 : )
1997 30 : .unwrap();
1998 30 :
1999 30 : Self {
2000 30 : started_evictions,
2001 30 : completed_evictions,
2002 30 : cancelled_evictions,
2003 30 :
2004 30 : started_deletes,
2005 30 : completed_deletes,
2006 30 : failed_deletes,
2007 30 :
2008 30 : rare_counters,
2009 30 : inits_cancelled,
2010 30 : redownload_after,
2011 30 : time_to_evict,
2012 30 : }
2013 30 : }
2014 : }
2015 :
2016 : impl LayerImplMetrics {
2017 26 : fn inc_started_evictions(&self) {
2018 26 : self.started_evictions.inc();
2019 26 : }
2020 18 : fn inc_completed_evictions(&self) {
2021 18 : self.completed_evictions.inc();
2022 18 : }
2023 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2024 8 : self.cancelled_evictions[reason].inc()
2025 8 : }
2026 :
2027 336 : fn inc_started_deletes(&self) {
2028 336 : self.started_deletes.inc();
2029 336 : }
2030 335 : fn inc_completed_deletes(&self) {
2031 335 : self.completed_deletes.inc();
2032 335 : }
2033 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2034 0 : self.failed_deletes[reason].inc();
2035 0 : }
2036 :
2037 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2038 : /// attempt regardless of failure to delete local file.
2039 0 : fn inc_delete_removes_failed(&self) {
2040 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2041 0 : }
2042 :
2043 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2044 : /// the single caller which can start the download, so use this counter to separte them.
2045 0 : fn inc_init_completed_without_requester(&self) {
2046 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2047 0 : }
2048 :
2049 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2050 0 : fn inc_download_failed_without_requester(&self) {
2051 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2052 0 : }
2053 :
2054 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2055 : ///
2056 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2057 : /// Option.
2058 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2059 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2060 0 : }
2061 :
2062 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2063 : /// running with remote storage.
2064 6 : fn inc_init_needed_no_download(&self) {
2065 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2066 6 : }
2067 :
2068 : /// Expected rare because all layer files should be readable and good
2069 0 : fn inc_permanent_loading_failures(&self) {
2070 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2071 0 : }
2072 :
2073 0 : fn inc_init_cancelled(&self) {
2074 0 : self.inits_cancelled.inc()
2075 0 : }
2076 :
2077 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2078 6 : self.redownload_after.observe(duration.as_secs_f64())
2079 6 : }
2080 :
2081 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2082 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2083 : /// from other evictions, so this could have noise as well.
2084 0 : fn inc_evicted_with_waiters(&self) {
2085 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2086 0 : }
2087 :
2088 : /// Recorded at least initially as the permit is now acquired in async context before
2089 : /// spawn_blocking action.
2090 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2091 18 : self.time_to_evict.observe(duration.as_secs_f64())
2092 18 : }
2093 : }
2094 :
2095 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2096 : enum EvictionCancelled {
2097 : LayerGone,
2098 : TimelineGone,
2099 : VersionCheckFailed,
2100 : FileNotFound,
2101 : RemoveFailed,
2102 : AlreadyReinitialized,
2103 : /// Not evicted because of a pending reinitialization
2104 : LostToDownload,
2105 : /// After eviction, there was a new layer access which cancelled the eviction.
2106 : UpgradedBackOnAccess,
2107 : UnexpectedEvictedState,
2108 : }
2109 :
2110 : impl EvictionCancelled {
2111 270 : fn as_str(&self) -> &'static str {
2112 270 : match self {
2113 30 : EvictionCancelled::LayerGone => "layer_gone",
2114 30 : EvictionCancelled::TimelineGone => "timeline_gone",
2115 30 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2116 30 : EvictionCancelled::FileNotFound => "file_not_found",
2117 30 : EvictionCancelled::RemoveFailed => "remove_failed",
2118 30 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2119 30 : EvictionCancelled::LostToDownload => "lost_to_download",
2120 30 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2121 30 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2122 : }
2123 270 : }
2124 : }
2125 :
2126 : #[derive(enum_map::Enum)]
2127 : enum DeleteFailed {
2128 : TimelineGone,
2129 : DeleteSchedulingFailed,
2130 : }
2131 :
2132 : impl DeleteFailed {
2133 60 : fn as_str(&self) -> &'static str {
2134 60 : match self {
2135 30 : DeleteFailed::TimelineGone => "timeline_gone",
2136 30 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2137 : }
2138 60 : }
2139 : }
2140 :
2141 : #[derive(enum_map::Enum)]
2142 : enum RareEvent {
2143 : RemoveOnDropFailed,
2144 : InitCompletedWithoutRequester,
2145 : DownloadFailedWithoutRequester,
2146 : UpgradedWantedEvicted,
2147 : InitWithoutDownload,
2148 : PermanentLoadingFailure,
2149 : EvictedWithWaiters,
2150 : }
2151 :
2152 : impl RareEvent {
2153 210 : fn as_str(&self) -> &'static str {
2154 210 : use RareEvent::*;
2155 210 :
2156 210 : match self {
2157 30 : RemoveOnDropFailed => "remove_on_drop_failed",
2158 30 : InitCompletedWithoutRequester => "init_completed_without",
2159 30 : DownloadFailedWithoutRequester => "download_failed_without",
2160 30 : UpgradedWantedEvicted => "raced_wanted_evicted",
2161 30 : InitWithoutDownload => "init_needed_no_download",
2162 30 : PermanentLoadingFailure => "permanent_loading_failure",
2163 30 : EvictedWithWaiters => "evicted_with_waiters",
2164 : }
2165 210 : }
2166 : }
2167 :
2168 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2169 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|