Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::{
5 : HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
6 : };
7 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
8 : use std::ops::Range;
9 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10 : use std::sync::{Arc, Weak};
11 : use std::time::{Duration, SystemTime};
12 : use tracing::Instrument;
13 : use utils::id::TimelineId;
14 : use utils::lsn::Lsn;
15 : use utils::sync::{gate, heavier_once_cell};
16 :
17 : use crate::config::PageServerConf;
18 : use crate::context::{DownloadBehavior, RequestContext};
19 : use crate::repository::Key;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::task_mgr::TaskKind;
22 : use crate::tenant::timeline::GetVectoredError;
23 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
24 :
25 : use super::delta_layer::{self, DeltaEntry};
26 : use super::image_layer::{self};
27 : use super::{
28 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
29 : PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
30 : };
31 :
32 : use utils::generation::Generation;
33 :
34 : #[cfg(test)]
35 : mod tests;
36 :
37 : #[cfg(test)]
38 : mod failpoints;
39 :
40 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
41 : /// range of LSNs.
42 : ///
43 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
44 : /// layers are used to ingest incoming WAL, and provide fast access to the
45 : /// recent page versions. On-disk layers are stored as files on disk, and are
46 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
47 : /// [`InMemoryLayer`].
48 : ///
49 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
50 : /// A delta layer contains all modifications within a range of LSNs and keys.
51 : /// An image layer is a snapshot of all the data in a key-range, at a single
52 : /// LSN.
53 : ///
54 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
55 : /// general goal, read accesses should always win eviction and eviction should not wait for
56 : /// download.
57 : ///
58 : /// ### State transitions
59 : ///
60 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
61 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
62 : /// right size) or deleted.
63 : ///
64 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
65 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
66 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
67 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
68 : /// cancelling the eviction.
69 : ///
70 : /// ```text
71 : /// +-----------------+ get_or_maybe_download +--------------------------------+
72 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
73 : /// | ENOENT | /->| |
74 : /// +-----------------+ | +--------------------------------+
75 : /// ^ | | ^
76 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
77 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
78 : /// | | | | - re-initialize without download
79 : /// | | evict_and_wait | |
80 : /// +-----------------+ v |
81 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
82 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
83 : /// +-----------------+ +--------------------------------------+
84 : /// ```
85 : ///
86 : /// ### Unsupported
87 : ///
88 : /// - Evicting by the operator deleting files from the filesystem
89 : ///
90 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
91 : #[derive(Clone)]
92 : pub(crate) struct Layer(Arc<LayerInner>);
93 :
94 : impl std::fmt::Display for Layer {
95 1886 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 1886 : write!(
97 1886 : f,
98 1886 : "{}{}",
99 1886 : self.layer_desc().short_id(),
100 1886 : self.0.generation.get_suffix()
101 1886 : )
102 1886 : }
103 : }
104 :
105 : impl std::fmt::Debug for Layer {
106 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 0 : write!(f, "{}", self)
108 0 : }
109 : }
110 :
111 : impl AsLayerDesc for Layer {
112 719328 : fn layer_desc(&self) -> &PersistentLayerDesc {
113 719328 : self.0.layer_desc()
114 719328 : }
115 : }
116 :
117 : impl PartialEq for Layer {
118 4 : fn eq(&self, other: &Self) -> bool {
119 4 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
120 4 : }
121 : }
122 :
123 1598 : pub(crate) fn local_layer_path(
124 1598 : conf: &PageServerConf,
125 1598 : tenant_shard_id: &TenantShardId,
126 1598 : timeline_id: &TimelineId,
127 1598 : layer_file_name: &LayerName,
128 1598 : generation: &Generation,
129 1598 : ) -> Utf8PathBuf {
130 1598 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
131 1598 :
132 1598 : if generation.is_none() {
133 : // Without a generation, we may only use legacy path style
134 0 : timeline_path.join(layer_file_name.to_string())
135 : } else {
136 1598 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
137 : }
138 1598 : }
139 :
140 : impl Layer {
141 : /// Creates a layer value for a file we know to not be resident.
142 0 : pub(crate) fn for_evicted(
143 0 : conf: &'static PageServerConf,
144 0 : timeline: &Arc<Timeline>,
145 0 : file_name: LayerName,
146 0 : metadata: LayerFileMetadata,
147 0 : ) -> Self {
148 0 : let local_path = local_layer_path(
149 0 : conf,
150 0 : &timeline.tenant_shard_id,
151 0 : &timeline.timeline_id,
152 0 : &file_name,
153 0 : &metadata.generation,
154 0 : );
155 0 :
156 0 : let desc = PersistentLayerDesc::from_filename(
157 0 : timeline.tenant_shard_id,
158 0 : timeline.timeline_id,
159 0 : file_name,
160 0 : metadata.file_size,
161 0 : );
162 0 :
163 0 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
164 0 :
165 0 : let owner = Layer(Arc::new(LayerInner::new(
166 0 : conf,
167 0 : timeline,
168 0 : local_path,
169 0 : access_stats,
170 0 : desc,
171 0 : None,
172 0 : metadata.generation,
173 0 : metadata.shard,
174 0 : )));
175 0 :
176 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
177 :
178 0 : owner
179 0 : }
180 :
181 : /// Creates a Layer value for a file we know to be resident in timeline directory.
182 24 : pub(crate) fn for_resident(
183 24 : conf: &'static PageServerConf,
184 24 : timeline: &Arc<Timeline>,
185 24 : local_path: Utf8PathBuf,
186 24 : file_name: LayerName,
187 24 : metadata: LayerFileMetadata,
188 24 : ) -> ResidentLayer {
189 24 : let desc = PersistentLayerDesc::from_filename(
190 24 : timeline.tenant_shard_id,
191 24 : timeline.timeline_id,
192 24 : file_name,
193 24 : metadata.file_size,
194 24 : );
195 24 :
196 24 : let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
197 24 :
198 24 : let mut resident = None;
199 24 :
200 24 : let owner = Layer(Arc::new_cyclic(|owner| {
201 24 : let inner = Arc::new(DownloadedLayer {
202 24 : owner: owner.clone(),
203 24 : kind: tokio::sync::OnceCell::default(),
204 24 : version: 0,
205 24 : });
206 24 : resident = Some(inner.clone());
207 24 :
208 24 : LayerInner::new(
209 24 : conf,
210 24 : timeline,
211 24 : local_path,
212 24 : access_stats,
213 24 : desc,
214 24 : Some(inner),
215 24 : metadata.generation,
216 24 : metadata.shard,
217 24 : )
218 24 : }));
219 24 :
220 24 : let downloaded = resident.expect("just initialized");
221 24 :
222 24 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
223 :
224 24 : timeline
225 24 : .metrics
226 24 : .resident_physical_size_add(metadata.file_size);
227 24 :
228 24 : ResidentLayer { downloaded, owner }
229 24 : }
230 :
231 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
232 : /// temporary path.
233 1590 : pub(crate) fn finish_creating(
234 1590 : conf: &'static PageServerConf,
235 1590 : timeline: &Arc<Timeline>,
236 1590 : desc: PersistentLayerDesc,
237 1590 : temp_path: &Utf8Path,
238 1590 : ) -> anyhow::Result<ResidentLayer> {
239 1590 : let mut resident = None;
240 1590 :
241 1590 : let owner = Layer(Arc::new_cyclic(|owner| {
242 1590 : let inner = Arc::new(DownloadedLayer {
243 1590 : owner: owner.clone(),
244 1590 : kind: tokio::sync::OnceCell::default(),
245 1590 : version: 0,
246 1590 : });
247 1590 : resident = Some(inner.clone());
248 1590 : let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
249 1590 : access_stats.record_residence_event(
250 1590 : LayerResidenceStatus::Resident,
251 1590 : LayerResidenceEventReason::LayerCreate,
252 1590 : );
253 1590 :
254 1590 : let local_path = local_layer_path(
255 1590 : conf,
256 1590 : &timeline.tenant_shard_id,
257 1590 : &timeline.timeline_id,
258 1590 : &desc.layer_name(),
259 1590 : &timeline.generation,
260 1590 : );
261 1590 :
262 1590 : LayerInner::new(
263 1590 : conf,
264 1590 : timeline,
265 1590 : local_path,
266 1590 : access_stats,
267 1590 : desc,
268 1590 : Some(inner),
269 1590 : timeline.generation,
270 1590 : timeline.get_shard_index(),
271 1590 : )
272 1590 : }));
273 1590 :
274 1590 : let downloaded = resident.expect("just initialized");
275 1590 :
276 1590 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
277 1590 : // TODO: this leaves the temp file in place if the rename fails, risking us running
278 1590 : // out of space. Should we clean it up here or does the calling context deal with this?
279 1590 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
280 1590 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
281 :
282 1590 : Ok(ResidentLayer { downloaded, owner })
283 1590 : }
284 :
285 : /// Requests the layer to be evicted and waits for this to be done.
286 : ///
287 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
288 : ///
289 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
290 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
291 : ///
292 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
293 : /// will happen regardless the future returned by this method completing unless there is a
294 : /// read access before eviction gets to complete.
295 : ///
296 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
297 : /// of download-evict cycle on retry.
298 36 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
299 52 : self.0.evict_and_wait(timeout).await
300 32 : }
301 :
302 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
303 : /// then.
304 : ///
305 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
306 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
307 : /// the value this is called on gets dropped.
308 : ///
309 : /// This is ensured by both of those methods accepting references to Layer.
310 : ///
311 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
312 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
313 438 : pub(crate) fn delete_on_drop(&self) {
314 438 : self.0.delete_on_drop();
315 438 : }
316 :
317 : /// Return data needed to reconstruct given page at LSN.
318 : ///
319 : /// It is up to the caller to collect more data from the previous layer and
320 : /// perform WAL redo, if necessary.
321 : ///
322 : /// # Cancellation-Safety
323 : ///
324 : /// This method is cancellation-safe.
325 211502 : pub(crate) async fn get_value_reconstruct_data(
326 211502 : &self,
327 211502 : key: Key,
328 211502 : lsn_range: Range<Lsn>,
329 211502 : reconstruct_data: &mut ValueReconstructState,
330 211502 : ctx: &RequestContext,
331 211502 : ) -> anyhow::Result<ValueReconstructResult> {
332 : use anyhow::ensure;
333 :
334 211502 : let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
335 211502 : self.0
336 211502 : .access_stats
337 211502 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
338 211502 :
339 211502 : if self.layer_desc().is_delta {
340 204429 : ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
341 204429 : ensure!(self.layer_desc().key_range.contains(&key));
342 : } else {
343 7073 : ensure!(self.layer_desc().key_range.contains(&key));
344 7073 : ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
345 7073 : ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
346 : }
347 :
348 211502 : layer
349 211502 : .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
350 211502 : .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
351 30360 : .await
352 211502 : .with_context(|| format!("get_value_reconstruct_data for layer {self}"))
353 211502 : }
354 :
355 226 : pub(crate) async fn get_values_reconstruct_data(
356 226 : &self,
357 226 : keyspace: KeySpace,
358 226 : lsn_range: Range<Lsn>,
359 226 : reconstruct_data: &mut ValuesReconstructState,
360 226 : ctx: &RequestContext,
361 226 : ) -> Result<(), GetVectoredError> {
362 226 : let layer = self
363 226 : .0
364 226 : .get_or_maybe_download(true, Some(ctx))
365 0 : .await
366 226 : .map_err(|err| match err {
367 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
368 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
369 226 : })?;
370 :
371 226 : self.0
372 226 : .access_stats
373 226 : .record_access(LayerAccessKind::GetValueReconstructData, ctx);
374 226 :
375 226 : layer
376 226 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
377 226 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
378 11414 : .await
379 226 : .map_err(|err| match err {
380 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
381 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
382 0 : ),
383 0 : err => err,
384 226 : })
385 226 : }
386 :
387 : /// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
388 : #[allow(dead_code)]
389 0 : pub(crate) async fn load_key_values(
390 0 : &self,
391 0 : ctx: &RequestContext,
392 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
393 0 : let layer = self
394 0 : .0
395 0 : .get_or_maybe_download(true, Some(ctx))
396 0 : .await
397 0 : .map_err(|err| match err {
398 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
399 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
400 0 : })?;
401 0 : layer.load_key_values(&self.0, ctx).await
402 0 : }
403 :
404 : /// Download the layer if evicted.
405 : ///
406 : /// Will not error when the layer is already downloaded.
407 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
408 0 : self.0.get_or_maybe_download(true, None).await?;
409 0 : Ok(())
410 0 : }
411 :
412 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
413 : /// while the guard exists.
414 : ///
415 : /// Returns None if the layer is currently evicted or becoming evicted.
416 : #[cfg(test)]
417 20 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
418 20 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
419 :
420 14 : Some(ResidentLayer {
421 14 : downloaded,
422 14 : owner: self.clone(),
423 14 : })
424 20 : }
425 :
426 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
427 : /// with `EvictionError::NotFound`.
428 : ///
429 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
430 : /// will be unless a read happens soon.
431 44 : pub(crate) fn is_likely_resident(&self) -> bool {
432 44 : self.0
433 44 : .inner
434 44 : .get()
435 44 : .map(|rowe| rowe.is_likely_resident())
436 44 : .unwrap_or(false)
437 44 : }
438 :
439 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
440 430 : pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
441 430 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
442 :
443 430 : Ok(ResidentLayer {
444 430 : downloaded,
445 430 : owner: self.clone(),
446 430 : })
447 430 : }
448 :
449 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
450 0 : self.0.info(reset)
451 0 : }
452 :
453 0 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
454 0 : &self.0.access_stats
455 0 : }
456 :
457 1592 : pub(crate) fn local_path(&self) -> &Utf8Path {
458 1592 : &self.0.path
459 1592 : }
460 :
461 211498 : pub(crate) fn debug_str(&self) -> &Arc<str> {
462 211498 : &self.0.debug_str
463 211498 : }
464 :
465 1878 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
466 1878 : self.0.metadata()
467 1878 : }
468 :
469 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
470 0 : self.0
471 0 : .timeline
472 0 : .upgrade()
473 0 : .map(|timeline| timeline.timeline_id)
474 0 : }
475 :
476 : /// Traditional debug dumping facility
477 : #[allow(unused)]
478 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
479 4 : self.0.desc.dump();
480 4 :
481 4 : if verbose {
482 : // for now, unconditionally download everything, even if that might not be wanted.
483 4 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
484 8 : l.dump(&self.0, ctx).await?
485 0 : }
486 :
487 4 : Ok(())
488 4 : }
489 :
490 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
491 : /// deletion scheduling has completed).
492 : ///
493 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
494 : /// separatedly.
495 : #[cfg(any(feature = "testing", test))]
496 2 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
497 2 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
498 :
499 2 : async move {
500 : loop {
501 6 : if rx.changed().await.is_err() {
502 2 : break;
503 0 : }
504 : }
505 2 : }
506 2 : }
507 : }
508 :
509 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
510 : ///
511 : /// However when we want something evicted, we cannot evict it right away as there might be current
512 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
513 : /// read with [`Layer::get_value_reconstruct_data`].
514 : ///
515 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
516 : #[derive(Debug)]
517 : enum ResidentOrWantedEvicted {
518 : Resident(Arc<DownloadedLayer>),
519 : WantedEvicted(Weak<DownloadedLayer>, usize),
520 : }
521 :
522 : impl ResidentOrWantedEvicted {
523 : /// Non-mutating access to the a DownloadedLayer, if possible.
524 : ///
525 : /// This is not used on the read path (anything that calls
526 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
527 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
528 : #[cfg(test)]
529 14 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
530 14 : match self {
531 14 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
532 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
533 : }
534 14 : }
535 :
536 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
537 : /// reference from `ResidentOrWantedEvicted::get`.
538 38 : fn is_likely_resident(&self) -> bool {
539 38 : match self {
540 32 : ResidentOrWantedEvicted::Resident(_) => true,
541 6 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
542 : }
543 38 : }
544 :
545 : /// Upgrades any weak to strong if possible.
546 : ///
547 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
548 : /// happened.
549 212166 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
550 212166 : match self {
551 212158 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
552 8 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
553 0 : Some(strong) => {
554 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
555 0 :
556 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
557 0 :
558 0 : Some((strong, true))
559 : }
560 8 : None => None,
561 : },
562 : }
563 212166 : }
564 :
565 : /// When eviction is first requested, drop down to holding a [`Weak`].
566 : ///
567 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
568 : /// drop the possibly last strong reference outside of the mutex of
569 : /// [`heavier_once_cell::OnceCell`].
570 30 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
571 30 : match self {
572 26 : ResidentOrWantedEvicted::Resident(strong) => {
573 26 : let weak = Arc::downgrade(strong);
574 26 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
575 26 : std::mem::swap(self, &mut temp);
576 26 : match temp {
577 26 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
578 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
579 : }
580 : }
581 4 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
582 : }
583 30 : }
584 : }
585 :
586 : struct LayerInner {
587 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
588 : /// [`Self::path`].
589 : conf: &'static PageServerConf,
590 :
591 : /// Full path to the file; unclear if this should exist anymore.
592 : path: Utf8PathBuf,
593 :
594 : /// String representation of the layer, used for traversal id.
595 : debug_str: Arc<str>,
596 :
597 : desc: PersistentLayerDesc,
598 :
599 : /// Timeline access is needed for remote timeline client and metrics.
600 : ///
601 : /// There should not be an access to timeline for any reason without entering the
602 : /// [`Timeline::gate`] at the same time.
603 : timeline: Weak<Timeline>,
604 :
605 : access_stats: LayerAccessStats,
606 :
607 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
608 : ///
609 : /// Filesystem changes (download, evict) are only done while holding a permit which the
610 : /// `heavier_once_cell` provides.
611 : ///
612 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
613 : /// possibly read while not holding it.
614 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
615 :
616 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
617 : wanted_deleted: AtomicBool,
618 :
619 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
620 : ///
621 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
622 : /// `ResidentOrWantedEvicted::WantedEvicted`.
623 : version: AtomicUsize,
624 :
625 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
626 : /// starts, or completes.
627 : ///
628 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
629 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
630 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
631 : /// [`ResidentOrWantedEvicted::Resident`] on access.
632 : ///
633 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
634 : status: Option<tokio::sync::watch::Sender<Status>>,
635 :
636 : /// Counter for exponential backoff with the download.
637 : ///
638 : /// This is atomic only for the purposes of having additional data only accessed while holding
639 : /// the InitPermit.
640 : consecutive_failures: AtomicUsize,
641 :
642 : /// The generation of this Layer.
643 : ///
644 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
645 : /// for created layers from [`Timeline::generation`].
646 : generation: Generation,
647 :
648 : /// The shard of this Layer.
649 : ///
650 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
651 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
652 : ///
653 : /// For loaded layers, this may be some other value if the tenant has undergone
654 : /// a shard split since the layer was originally written.
655 : shard: ShardIndex,
656 :
657 : /// When the Layer was last evicted but has not been downloaded since.
658 : ///
659 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
660 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
661 :
662 : #[cfg(test)]
663 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
664 : }
665 :
666 : impl std::fmt::Display for LayerInner {
667 30 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 30 : write!(f, "{}", self.layer_desc().short_id())
669 30 : }
670 : }
671 :
672 : impl AsLayerDesc for LayerInner {
673 721528 : fn layer_desc(&self) -> &PersistentLayerDesc {
674 721528 : &self.desc
675 721528 : }
676 : }
677 :
678 : #[derive(Debug, Clone, Copy)]
679 : enum Status {
680 : Resident,
681 : Evicted,
682 : Downloading,
683 : }
684 :
685 : impl Drop for LayerInner {
686 500 : fn drop(&mut self) {
687 : // if there was a pending eviction, mark it cancelled here to balance metrics
688 500 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
689 2 : {
690 2 : // eviction has already been started
691 2 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
692 2 :
693 2 : // eviction request is intentionally not honored as no one is present to wait for it
694 2 : // and we could be delaying shutdown for nothing.
695 498 : }
696 :
697 500 : if let Some(timeline) = self.timeline.upgrade() {
698 : // Only need to decrement metrics if the timeline still exists: otherwise
699 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
700 484 : if self.desc.is_delta() {
701 452 : timeline.metrics.layer_count_delta.dec();
702 452 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
703 452 : } else {
704 32 : timeline.metrics.layer_count_image.dec();
705 32 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
706 32 : }
707 16 : }
708 :
709 500 : if !*self.wanted_deleted.get_mut() {
710 66 : return;
711 434 : }
712 :
713 434 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
714 :
715 434 : let path = std::mem::take(&mut self.path);
716 434 : let file_name = self.layer_desc().layer_name();
717 434 : let file_size = self.layer_desc().file_size;
718 434 : let timeline = self.timeline.clone();
719 434 : let meta = self.metadata();
720 434 : let status = self.status.take();
721 434 :
722 434 : Self::spawn_blocking(move || {
723 434 : let _g = span.entered();
724 434 :
725 434 : // carry this until we are finished for [`Layer::wait_drop`] support
726 434 : let _status = status;
727 :
728 434 : let Some(timeline) = timeline.upgrade() else {
729 : // no need to nag that timeline is gone: under normal situation on
730 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
731 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
732 0 : return;
733 : };
734 :
735 434 : let Ok(_guard) = timeline.gate.enter() else {
736 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
737 0 : return;
738 : };
739 :
740 434 : let removed = match std::fs::remove_file(path) {
741 432 : Ok(()) => true,
742 2 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
743 2 : // until we no longer do detaches by removing all local files before removing the
744 2 : // tenant from the global map, we will always get these errors even if we knew what
745 2 : // is the latest state.
746 2 : //
747 2 : // we currently do not track the latest state, so we'll also end up here on evicted
748 2 : // layers.
749 2 : false
750 : }
751 0 : Err(e) => {
752 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
753 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
754 0 : false
755 : }
756 : };
757 :
758 434 : if removed {
759 432 : timeline.metrics.resident_physical_size_sub(file_size);
760 432 : }
761 434 : let res = timeline
762 434 : .remote_client
763 434 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
764 :
765 434 : if let Err(e) = res {
766 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
767 : // demonstrating this deadlock (without spawn_blocking): stop will drop
768 : // queued items, which will have ResidentLayer's, and those drops would try
769 : // to re-entrantly lock the RemoteTimelineClient inner state.
770 0 : if !timeline.is_active() {
771 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
772 : } else {
773 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
774 : }
775 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
776 434 : } else {
777 434 : LAYER_IMPL_METRICS.inc_completed_deletes();
778 434 : }
779 434 : });
780 500 : }
781 : }
782 :
783 : impl LayerInner {
784 : #[allow(clippy::too_many_arguments)]
785 1614 : fn new(
786 1614 : conf: &'static PageServerConf,
787 1614 : timeline: &Arc<Timeline>,
788 1614 : local_path: Utf8PathBuf,
789 1614 : access_stats: LayerAccessStats,
790 1614 : desc: PersistentLayerDesc,
791 1614 : downloaded: Option<Arc<DownloadedLayer>>,
792 1614 : generation: Generation,
793 1614 : shard: ShardIndex,
794 1614 : ) -> Self {
795 1614 : let (inner, version, init_status) = if let Some(inner) = downloaded {
796 1614 : let version = inner.version;
797 1614 : let resident = ResidentOrWantedEvicted::Resident(inner);
798 1614 : (
799 1614 : heavier_once_cell::OnceCell::new(resident),
800 1614 : version,
801 1614 : Status::Resident,
802 1614 : )
803 : } else {
804 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
805 : };
806 :
807 : // This object acts as a RAII guard on these metrics: increment on construction
808 1614 : if desc.is_delta() {
809 1366 : timeline.metrics.layer_count_delta.inc();
810 1366 : timeline.metrics.layer_size_delta.add(desc.file_size);
811 1366 : } else {
812 248 : timeline.metrics.layer_count_image.inc();
813 248 : timeline.metrics.layer_size_image.add(desc.file_size);
814 248 : }
815 :
816 1614 : LayerInner {
817 1614 : conf,
818 1614 : debug_str: {
819 1614 : format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
820 1614 : },
821 1614 : path: local_path,
822 1614 : desc,
823 1614 : timeline: Arc::downgrade(timeline),
824 1614 : access_stats,
825 1614 : wanted_deleted: AtomicBool::new(false),
826 1614 : inner,
827 1614 : version: AtomicUsize::new(version),
828 1614 : status: Some(tokio::sync::watch::channel(init_status).0),
829 1614 : consecutive_failures: AtomicUsize::new(0),
830 1614 : generation,
831 1614 : shard,
832 1614 : last_evicted_at: std::sync::Mutex::default(),
833 1614 : #[cfg(test)]
834 1614 : failpoints: Default::default(),
835 1614 : }
836 1614 : }
837 :
838 438 : fn delete_on_drop(&self) {
839 438 : let res =
840 438 : self.wanted_deleted
841 438 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
842 438 :
843 438 : if res.is_ok() {
844 434 : LAYER_IMPL_METRICS.inc_started_deletes();
845 434 : }
846 438 : }
847 :
848 : /// Cancellation safe, however dropping the future and calling this method again might result
849 : /// in a new attempt to evict OR join the previously started attempt.
850 108 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
851 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
852 : let mut rx = self.status.as_ref().unwrap().subscribe();
853 :
854 : {
855 : let current = rx.borrow_and_update();
856 : match &*current {
857 : Status::Resident => {
858 : // we might get lucky and evict this; continue
859 : }
860 : Status::Evicted | Status::Downloading => {
861 : // it is already evicted
862 : return Err(EvictionError::NotFound);
863 : }
864 : }
865 : }
866 :
867 : let strong = {
868 : match self.inner.get() {
869 : Some(mut either) => either.downgrade(),
870 : None => {
871 : // we already have a scheduled eviction, which just has not gotten to run yet.
872 : // it might still race with a read access, but that could also get cancelled,
873 : // so let's say this is not evictable.
874 : return Err(EvictionError::NotFound);
875 : }
876 : }
877 : };
878 :
879 : if strong.is_some() {
880 : // drop the DownloadedLayer outside of the holding the guard
881 : drop(strong);
882 :
883 : // idea here is that only one evicter should ever get to witness a strong reference,
884 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
885 : // cancelled eviction and signal us, like it currently does.
886 : //
887 : // a second concurrent evict_and_wait will not see a strong reference.
888 : LAYER_IMPL_METRICS.inc_started_evictions();
889 : }
890 :
891 : let changed = rx.changed();
892 : let changed = tokio::time::timeout(timeout, changed).await;
893 :
894 : let Ok(changed) = changed else {
895 : return Err(EvictionError::Timeout);
896 : };
897 :
898 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
899 :
900 : let current = rx.borrow_and_update();
901 :
902 : match &*current {
903 : // the easiest case
904 : Status::Evicted => Ok(()),
905 : // it surely was evicted in between, but then there was a new access now; we can't know
906 : // if it'll succeed so lets just call it evicted
907 : Status::Downloading => Ok(()),
908 : // either the download which was started after eviction completed already, or it was
909 : // never evicted
910 : Status::Resident => Err(EvictionError::Downloaded),
911 : }
912 : }
913 :
914 : /// Cancellation safe.
915 212174 : async fn get_or_maybe_download(
916 212174 : self: &Arc<Self>,
917 212174 : allow_download: bool,
918 212174 : ctx: Option<&RequestContext>,
919 212174 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
920 16 : let (weak, permit) = {
921 : // get_or_init_detached can:
922 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
923 : // - be slow (wait for semaphore permit or closing)
924 212174 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
925 :
926 212174 : let locked = self
927 212174 : .inner
928 212174 : .get_or_init_detached()
929 2 : .await
930 212174 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
931 212174 :
932 212174 : scopeguard::ScopeGuard::into_inner(init_cancelled);
933 :
934 212158 : match locked {
935 : // this path could had been a RwLock::read
936 212158 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
937 0 : Ok(Ok((strong, _))) => {
938 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
939 0 : // previously a `evict_and_wait` was received. this is the only place when we
940 0 : // send out an update without holding the InitPermit.
941 0 : //
942 0 : // note that we also have dropped the Guard; this is fine, because we just made
943 0 : // a state change and are holding a strong reference to be returned.
944 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
945 0 : LAYER_IMPL_METRICS
946 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
947 0 :
948 0 : return Ok(strong);
949 : }
950 8 : Ok(Err(guard)) => {
951 8 : // path to here: we won the eviction, the file should still be on the disk.
952 8 : let (weak, permit) = guard.take_and_deinit();
953 8 : (Some(weak), permit)
954 : }
955 8 : Err(permit) => (None, permit),
956 : }
957 : };
958 :
959 16 : if let Some(weak) = weak {
960 : // only drop the weak after dropping the heavier_once_cell guard
961 8 : assert!(
962 8 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
963 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
964 : );
965 8 : }
966 :
967 16 : let timeline = self
968 16 : .timeline
969 16 : .upgrade()
970 16 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
971 :
972 : // count cancellations, which currently remain largely unexpected
973 16 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
974 :
975 : // check if we really need to be downloaded: this can happen if a read access won the
976 : // semaphore before eviction.
977 : //
978 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
979 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
980 16 : let needs_download = self
981 16 : .needs_download()
982 13 : .await
983 16 : .map_err(DownloadError::PreStatFailed);
984 16 :
985 16 : scopeguard::ScopeGuard::into_inner(init_cancelled);
986 :
987 16 : let needs_download = needs_download?;
988 :
989 16 : let Some(reason) = needs_download else {
990 : // the file is present locally because eviction has not had a chance to run yet
991 :
992 : #[cfg(test)]
993 8 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
994 2 : .await?;
995 :
996 6 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
997 6 :
998 6 : return Ok(self.initialize_after_layer_is_on_disk(permit));
999 : };
1000 :
1001 : // we must download; getting cancelled before spawning the download is not an issue as
1002 : // any still running eviction would not find anything to evict.
1003 :
1004 8 : if let NeedsDownload::NotFile(ft) = reason {
1005 0 : return Err(DownloadError::NotFile(ft));
1006 8 : }
1007 :
1008 8 : if let Some(ctx) = ctx {
1009 2 : self.check_expected_download(ctx)?;
1010 6 : }
1011 :
1012 8 : if !allow_download {
1013 : // this is only used from tests, but it is hard to test without the boolean
1014 2 : return Err(DownloadError::DownloadRequired);
1015 6 : }
1016 6 :
1017 6 : let download_ctx = ctx
1018 6 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1019 6 : .unwrap_or(RequestContext::new(
1020 6 : TaskKind::LayerDownload,
1021 6 : DownloadBehavior::Download,
1022 6 : ));
1023 :
1024 6 : async move {
1025 6 : tracing::info!(%reason, "downloading on-demand");
1026 :
1027 6 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1028 6 : let res = self
1029 6 : .download_init_and_wait(timeline, permit, download_ctx)
1030 10 : .await?;
1031 6 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1032 6 : Ok(res)
1033 6 : }
1034 6 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1035 10 : .await
1036 212174 : }
1037 :
1038 : /// Nag or fail per RequestContext policy
1039 2 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1040 2 : use crate::context::DownloadBehavior::*;
1041 2 : let b = ctx.download_behavior();
1042 2 : match b {
1043 2 : Download => Ok(()),
1044 : Warn | Error => {
1045 0 : tracing::info!(
1046 0 : "unexpectedly on-demand downloading for task kind {:?}",
1047 0 : ctx.task_kind()
1048 : );
1049 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1050 :
1051 0 : let really_error =
1052 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1053 :
1054 0 : if really_error {
1055 : // this check is only probablistic, seems like flakyness footgun
1056 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1057 : } else {
1058 0 : Ok(())
1059 : }
1060 : }
1061 : }
1062 2 : }
1063 :
1064 : /// Actual download, at most one is executed at the time.
1065 6 : async fn download_init_and_wait(
1066 6 : self: &Arc<Self>,
1067 6 : timeline: Arc<Timeline>,
1068 6 : permit: heavier_once_cell::InitPermit,
1069 6 : ctx: RequestContext,
1070 6 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1071 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1072 6 :
1073 6 : let (tx, rx) = tokio::sync::oneshot::channel();
1074 6 :
1075 6 : let this: Arc<Self> = self.clone();
1076 :
1077 6 : let guard = timeline
1078 6 : .gate
1079 6 : .enter()
1080 6 : .map_err(|_| DownloadError::DownloadCancelled)?;
1081 :
1082 6 : Self::spawn(
1083 6 : async move {
1084 0 : let _guard = guard;
1085 0 :
1086 0 : // now that we have commited to downloading, send out an update to:
1087 0 : // - unhang any pending eviction
1088 0 : // - break out of evict_and_wait
1089 0 : this.status
1090 0 : .as_ref()
1091 0 : .unwrap()
1092 0 : .send_replace(Status::Downloading);
1093 6 :
1094 6 : #[cfg(test)]
1095 6 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1096 2 : .await
1097 6 : .unwrap();
1098 :
1099 99 : let res = this.download_and_init(timeline, permit, &ctx).await;
1100 :
1101 6 : if let Err(res) = tx.send(res) {
1102 0 : match res {
1103 0 : Ok(_res) => {
1104 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1105 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1106 : }
1107 0 : Err(e) => {
1108 0 : tracing::info!(
1109 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1110 : );
1111 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1112 : }
1113 : }
1114 6 : }
1115 6 : }
1116 6 : .in_current_span(),
1117 6 : );
1118 6 :
1119 10 : match rx.await {
1120 6 : Ok(Ok(res)) => Ok(res),
1121 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1122 0 : Err(DownloadError::DownloadCancelled)
1123 : }
1124 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1125 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1126 : }
1127 6 : }
1128 :
1129 6 : async fn download_and_init(
1130 6 : self: &Arc<LayerInner>,
1131 6 : timeline: Arc<Timeline>,
1132 6 : permit: heavier_once_cell::InitPermit,
1133 6 : ctx: &RequestContext,
1134 6 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1135 6 : let result = timeline
1136 6 : .remote_client
1137 6 : .download_layer_file(
1138 6 : &self.desc.layer_name(),
1139 6 : &self.metadata(),
1140 6 : &self.path,
1141 6 : &timeline.cancel,
1142 6 : ctx,
1143 6 : )
1144 93 : .await;
1145 :
1146 6 : match result {
1147 6 : Ok(size) => {
1148 6 : assert_eq!(size, self.desc.file_size);
1149 :
1150 6 : match self.needs_download().await {
1151 0 : Ok(Some(reason)) => {
1152 0 : // this is really a bug in needs_download or remote timeline client
1153 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1154 : }
1155 6 : Ok(None) => {
1156 6 : // as expected
1157 6 : }
1158 0 : Err(e) => {
1159 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1160 : }
1161 : }
1162 :
1163 6 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1164 6 : timeline
1165 6 : .metrics
1166 6 : .resident_physical_size_add(self.desc.file_size);
1167 6 : self.consecutive_failures.store(0, Ordering::Relaxed);
1168 6 :
1169 6 : let since_last_eviction = self
1170 6 : .last_evicted_at
1171 6 : .lock()
1172 6 : .unwrap()
1173 6 : .take()
1174 6 : .map(|ts| ts.elapsed());
1175 6 : if let Some(since_last_eviction) = since_last_eviction {
1176 6 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1177 6 : }
1178 :
1179 6 : self.access_stats.record_residence_event(
1180 6 : LayerResidenceStatus::Resident,
1181 6 : LayerResidenceEventReason::ResidenceChange,
1182 6 : );
1183 6 :
1184 6 : Ok(self.initialize_after_layer_is_on_disk(permit))
1185 : }
1186 0 : Err(e) => {
1187 0 : let consecutive_failures =
1188 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1189 0 :
1190 0 : if timeline.cancel.is_cancelled() {
1191 : // If we're shutting down, drop out before logging the error
1192 0 : return Err(e);
1193 0 : }
1194 0 :
1195 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1196 :
1197 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1198 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1199 0 : 1.5,
1200 0 : 60.0,
1201 0 : );
1202 0 :
1203 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1204 :
1205 : tokio::select! {
1206 : _ = tokio::time::sleep(backoff) => {},
1207 : _ = timeline.cancel.cancelled() => {},
1208 : };
1209 :
1210 0 : Err(e)
1211 : }
1212 : }
1213 6 : }
1214 :
1215 : /// Initializes the `Self::inner` to a "resident" state.
1216 : ///
1217 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1218 : /// before calling this method.
1219 : ///
1220 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1221 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1222 12 : fn initialize_after_layer_is_on_disk(
1223 12 : self: &Arc<LayerInner>,
1224 12 : permit: heavier_once_cell::InitPermit,
1225 12 : ) -> Arc<DownloadedLayer> {
1226 12 : debug_assert_current_span_has_tenant_and_timeline_id();
1227 12 :
1228 12 : // disable any scheduled but not yet running eviction deletions for this initialization
1229 12 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1230 12 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1231 12 :
1232 12 : let res = Arc::new(DownloadedLayer {
1233 12 : owner: Arc::downgrade(self),
1234 12 : kind: tokio::sync::OnceCell::default(),
1235 12 : version: next_version,
1236 12 : });
1237 12 :
1238 12 : let waiters = self.inner.initializer_count();
1239 12 : if waiters > 0 {
1240 0 : tracing::info!(waiters, "completing layer init for other tasks");
1241 12 : }
1242 :
1243 12 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1244 12 :
1245 12 : self.inner.set(value, permit);
1246 12 :
1247 12 : res
1248 12 : }
1249 :
1250 24 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1251 24 : match tokio::fs::metadata(&self.path).await {
1252 16 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1253 8 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1254 0 : Err(e) => Err(e),
1255 : }
1256 24 : }
1257 :
1258 24 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1259 24 : match self.path.metadata() {
1260 24 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1261 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1262 0 : Err(e) => Err(e),
1263 : }
1264 24 : }
1265 :
1266 40 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1267 40 : // in future, this should include sha2-256 validation of the file.
1268 40 : if !m.is_file() {
1269 0 : Err(NeedsDownload::NotFile(m.file_type()))
1270 40 : } else if m.len() != self.desc.file_size {
1271 0 : Err(NeedsDownload::WrongSize {
1272 0 : actual: m.len(),
1273 0 : expected: self.desc.file_size,
1274 0 : })
1275 : } else {
1276 40 : Ok(())
1277 : }
1278 40 : }
1279 :
1280 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1281 0 : let layer_name = self.desc.layer_name().to_string();
1282 0 :
1283 0 : let resident = self
1284 0 : .inner
1285 0 : .get()
1286 0 : .map(|rowe| rowe.is_likely_resident())
1287 0 : .unwrap_or(false);
1288 0 :
1289 0 : let access_stats = self.access_stats.as_api_model(reset);
1290 0 :
1291 0 : if self.desc.is_delta {
1292 0 : let lsn_range = &self.desc.lsn_range;
1293 0 :
1294 0 : HistoricLayerInfo::Delta {
1295 0 : layer_file_name: layer_name,
1296 0 : layer_file_size: self.desc.file_size,
1297 0 : lsn_start: lsn_range.start,
1298 0 : lsn_end: lsn_range.end,
1299 0 : remote: !resident,
1300 0 : access_stats,
1301 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(self.layer_desc()),
1302 0 : }
1303 : } else {
1304 0 : let lsn = self.desc.image_layer_lsn();
1305 0 :
1306 0 : HistoricLayerInfo::Image {
1307 0 : layer_file_name: layer_name,
1308 0 : layer_file_size: self.desc.file_size,
1309 0 : lsn_start: lsn,
1310 0 : remote: !resident,
1311 0 : access_stats,
1312 0 : }
1313 : }
1314 0 : }
1315 :
1316 : /// `DownloadedLayer` is being dropped, so it calls this method.
1317 24 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1318 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1319 : // though here it is very likely
1320 24 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1321 :
1322 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1323 : // drop while the `self.inner` is being locked, leading to a deadlock.
1324 :
1325 24 : let start_evicting = async move {
1326 24 : #[cfg(test)]
1327 24 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1328 14 : .await
1329 24 : .expect("failpoint should not have errored");
1330 24 :
1331 24 : tracing::debug!("eviction started");
1332 :
1333 24 : let res = self.wait_for_turn_and_evict(only_version).await;
1334 : // metrics: ignore the Ok branch, it is not done yet
1335 24 : if let Err(e) = res {
1336 6 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1337 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1338 18 : }
1339 24 : };
1340 :
1341 24 : Self::spawn(start_evicting.instrument(span));
1342 24 : }
1343 :
1344 24 : async fn wait_for_turn_and_evict(
1345 24 : self: Arc<LayerInner>,
1346 24 : only_version: usize,
1347 24 : ) -> Result<(), EvictionCancelled> {
1348 46 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1349 46 : use Status::*;
1350 46 : match status {
1351 44 : Resident => Ok(()),
1352 2 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1353 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1354 : }
1355 46 : }
1356 :
1357 24 : let timeline = self
1358 24 : .timeline
1359 24 : .upgrade()
1360 24 : .ok_or(EvictionCancelled::TimelineGone)?;
1361 :
1362 24 : let mut rx = self
1363 24 : .status
1364 24 : .as_ref()
1365 24 : .expect("LayerInner cannot be dropped, holding strong ref")
1366 24 : .subscribe();
1367 24 :
1368 24 : is_good_to_continue(&rx.borrow_and_update())?;
1369 :
1370 22 : let Ok(gate) = timeline.gate.enter() else {
1371 0 : return Err(EvictionCancelled::TimelineGone);
1372 : };
1373 :
1374 18 : let permit = {
1375 : // we cannot just `std::fs::remove_file` because there might already be an
1376 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1377 : // operations must be done while holding the heavier_once_cell::InitPermit
1378 22 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1379 :
1380 22 : let waited = loop {
1381 22 : // we must race to the Downloading starting, otherwise we would have to wait until the
1382 22 : // completion of the download. waiting for download could be long and hinder our
1383 22 : // efforts to alert on "hanging" evictions.
1384 22 : tokio::select! {
1385 : res = &mut wait => break res,
1386 : _ = rx.changed() => {
1387 : is_good_to_continue(&rx.borrow_and_update())?;
1388 : // two possibilities for Status::Resident:
1389 : // - the layer was found locally from disk by a read
1390 : // - we missed a bunch of updates and now the layer is
1391 : // again downloaded -- assume we'll fail later on with
1392 : // version check or AlreadyReinitialized
1393 : }
1394 22 : }
1395 22 : };
1396 :
1397 : // re-check now that we have the guard or permit; all updates should have happened
1398 : // while holding the permit.
1399 22 : is_good_to_continue(&rx.borrow_and_update())?;
1400 :
1401 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1402 : // lead to deallocating the reference counted value, and the value we
1403 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1404 22 : let (_weak, permit) = match waited {
1405 20 : Ok(guard) => {
1406 20 : match &*guard {
1407 18 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1408 18 : if *version == only_version =>
1409 16 : {
1410 16 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1411 16 : let (weak, permit) = guard.take_and_deinit();
1412 16 : (Some(weak), permit)
1413 : }
1414 2 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1415 2 : // if we were not doing the version check, we would need to try to
1416 2 : // upgrade the weak here to see if it really is dropped. version check
1417 2 : // is done instead assuming that it is cheaper.
1418 2 : tracing::debug!(
1419 : version,
1420 : only_version,
1421 0 : "version mismatch, not deinitializing"
1422 : );
1423 2 : return Err(EvictionCancelled::VersionCheckFailed);
1424 : }
1425 : ResidentOrWantedEvicted::Resident(_) => {
1426 2 : return Err(EvictionCancelled::AlreadyReinitialized);
1427 : }
1428 : }
1429 : }
1430 2 : Err(permit) => {
1431 2 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1432 2 : (None, permit)
1433 : }
1434 : };
1435 :
1436 18 : permit
1437 18 : };
1438 18 :
1439 18 : let span = tracing::Span::current();
1440 18 :
1441 18 : let spawned_at = std::time::Instant::now();
1442 18 :
1443 18 : // this is on purpose a detached spawn; we don't need to wait for it
1444 18 : //
1445 18 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1446 18 : // well from a spawn_blocking thread.
1447 18 : //
1448 18 : // important to note that now that we've acquired the permit we have made sure the evicted
1449 18 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1450 18 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1451 18 : // evicting.
1452 18 : //
1453 18 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1454 18 : // reads. We will need to add cancellation for that if necessary.
1455 18 : Self::spawn_blocking(move || {
1456 18 : let _span = span.entered();
1457 18 :
1458 18 : let res = self.evict_blocking(&timeline, &gate, &permit);
1459 18 :
1460 18 : let waiters = self.inner.initializer_count();
1461 18 :
1462 18 : if waiters > 0 {
1463 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1464 18 : }
1465 :
1466 18 : let completed_in = spawned_at.elapsed();
1467 18 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1468 18 :
1469 18 : match res {
1470 18 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1471 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1472 : }
1473 :
1474 18 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1475 18 : });
1476 18 :
1477 18 : Ok(())
1478 24 : }
1479 :
1480 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1481 18 : fn evict_blocking(
1482 18 : &self,
1483 18 : timeline: &Timeline,
1484 18 : _gate: &gate::GateGuard,
1485 18 : _permit: &heavier_once_cell::InitPermit,
1486 18 : ) -> Result<(), EvictionCancelled> {
1487 18 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1488 18 :
1489 18 : match capture_mtime_and_remove(&self.path) {
1490 18 : Ok(local_layer_mtime) => {
1491 18 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1492 18 : match duration {
1493 18 : Ok(elapsed) => {
1494 18 : let accessed = self.access_stats.accessed();
1495 18 : if accessed {
1496 4 : // Only layers used for reads contribute to our "low residence" metric that is used
1497 4 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1498 4 : // to be rapidly evicted without contributing to this metric.
1499 4 : timeline
1500 4 : .metrics
1501 4 : .evictions_with_low_residence_duration
1502 4 : .read()
1503 4 : .unwrap()
1504 4 : .observe(elapsed);
1505 14 : }
1506 :
1507 18 : tracing::info!(
1508 0 : residence_millis = elapsed.as_millis(),
1509 0 : accessed,
1510 0 : "evicted layer after known residence period"
1511 : );
1512 : }
1513 : Err(_) => {
1514 0 : tracing::info!("evicted layer after unknown residence period");
1515 : }
1516 : }
1517 18 : timeline.metrics.evictions.inc();
1518 18 : timeline
1519 18 : .metrics
1520 18 : .resident_physical_size_sub(self.desc.file_size);
1521 : }
1522 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1523 0 : tracing::error!(
1524 : layer_size = %self.desc.file_size,
1525 0 : "failed to evict layer from disk, it was already gone"
1526 : );
1527 0 : return Err(EvictionCancelled::FileNotFound);
1528 : }
1529 0 : Err(e) => {
1530 0 : // FIXME: this should probably be an abort
1531 0 : tracing::error!("failed to evict file from disk: {e:#}");
1532 0 : return Err(EvictionCancelled::RemoveFailed);
1533 : }
1534 : }
1535 :
1536 18 : self.access_stats.record_residence_event(
1537 18 : LayerResidenceStatus::Evicted,
1538 18 : LayerResidenceEventReason::ResidenceChange,
1539 18 : );
1540 18 :
1541 18 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1542 18 :
1543 18 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1544 18 :
1545 18 : Ok(())
1546 18 : }
1547 :
1548 2318 : fn metadata(&self) -> LayerFileMetadata {
1549 2318 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1550 2318 : }
1551 :
1552 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1553 : ///
1554 : /// Synchronizing with spawned tasks is very complicated otherwise.
1555 30 : fn spawn<F>(fut: F)
1556 30 : where
1557 30 : F: std::future::Future<Output = ()> + Send + 'static,
1558 30 : {
1559 30 : #[cfg(test)]
1560 30 : tokio::task::spawn(fut);
1561 30 : #[cfg(not(test))]
1562 30 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1563 30 : }
1564 :
1565 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1566 452 : fn spawn_blocking<F>(f: F)
1567 452 : where
1568 452 : F: FnOnce() + Send + 'static,
1569 452 : {
1570 452 : #[cfg(test)]
1571 452 : tokio::task::spawn_blocking(f);
1572 452 : #[cfg(not(test))]
1573 452 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1574 452 : }
1575 : }
1576 :
1577 18 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1578 18 : let m = path.metadata()?;
1579 18 : let local_layer_mtime = m.modified()?;
1580 18 : std::fs::remove_file(path)?;
1581 18 : Ok(local_layer_mtime)
1582 18 : }
1583 :
1584 0 : #[derive(Debug, thiserror::Error)]
1585 : pub(crate) enum EvictionError {
1586 : #[error("layer was already evicted")]
1587 : NotFound,
1588 :
1589 : /// Evictions must always lose to downloads in races, and this time it happened.
1590 : #[error("layer was downloaded instead")]
1591 : Downloaded,
1592 :
1593 : #[error("eviction did not happen within timeout")]
1594 : Timeout,
1595 : }
1596 :
1597 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1598 0 : #[derive(Debug, thiserror::Error)]
1599 : pub(crate) enum DownloadError {
1600 : #[error("timeline has already shutdown")]
1601 : TimelineShutdown,
1602 : #[error("context denies downloading")]
1603 : ContextAndConfigReallyDeniesDownloads,
1604 : #[error("downloading is really required but not allowed by this method")]
1605 : DownloadRequired,
1606 : #[error("layer path exists, but it is not a file: {0:?}")]
1607 : NotFile(std::fs::FileType),
1608 : /// Why no error here? Because it will be reported by page_service. We should had also done
1609 : /// retries already.
1610 : #[error("downloading evicted layer file failed")]
1611 : DownloadFailed,
1612 : #[error("downloading failed, possibly for shutdown")]
1613 : DownloadCancelled,
1614 : #[error("pre-condition: stat before download failed")]
1615 : PreStatFailed(#[source] std::io::Error),
1616 :
1617 : #[cfg(test)]
1618 : #[error("failpoint: {0:?}")]
1619 : Failpoint(failpoints::FailpointKind),
1620 : }
1621 :
1622 : #[derive(Debug, PartialEq)]
1623 : pub(crate) enum NeedsDownload {
1624 : NotFound,
1625 : NotFile(std::fs::FileType),
1626 : WrongSize { actual: u64, expected: u64 },
1627 : }
1628 :
1629 : impl std::fmt::Display for NeedsDownload {
1630 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1631 6 : match self {
1632 6 : NeedsDownload::NotFound => write!(f, "file was not found"),
1633 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1634 0 : NeedsDownload::WrongSize { actual, expected } => {
1635 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1636 : }
1637 : }
1638 6 : }
1639 : }
1640 :
1641 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1642 : pub(crate) struct DownloadedLayer {
1643 : owner: Weak<LayerInner>,
1644 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1645 : // DownloadedLayer
1646 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1647 : version: usize,
1648 : }
1649 :
1650 : impl std::fmt::Debug for DownloadedLayer {
1651 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1652 0 : f.debug_struct("DownloadedLayer")
1653 0 : // owner omitted because it is always "Weak"
1654 0 : .field("kind", &self.kind)
1655 0 : .field("version", &self.version)
1656 0 : .finish()
1657 0 : }
1658 : }
1659 :
1660 : impl Drop for DownloadedLayer {
1661 522 : fn drop(&mut self) {
1662 522 : if let Some(owner) = self.owner.upgrade() {
1663 24 : owner.on_downloaded_layer_drop(self.version);
1664 498 : } else {
1665 498 : // Layer::drop will handle cancelling the eviction; because of drop order and
1666 498 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1667 498 : }
1668 522 : }
1669 : }
1670 :
1671 : impl DownloadedLayer {
1672 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
1673 : /// initialize it permanently.
1674 : ///
1675 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1676 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1677 : /// we will always have the LayerInner on the callstack, so we can just use it.
1678 212230 : async fn get<'a>(
1679 212230 : &'a self,
1680 212230 : owner: &Arc<LayerInner>,
1681 212230 : ctx: &RequestContext,
1682 212230 : ) -> anyhow::Result<&'a LayerKind> {
1683 212230 : let init = || async {
1684 1108 : assert_eq!(
1685 1108 : Weak::as_ptr(&self.owner),
1686 1108 : Arc::as_ptr(owner),
1687 1108 : "these are the same, just avoiding the upgrade"
1688 1108 : );
1689 1108 :
1690 1108 : let res = if owner.desc.is_delta {
1691 1108 : let summary = Some(delta_layer::Summary::expected(
1692 1018 : owner.desc.tenant_shard_id.tenant_id,
1693 1018 : owner.desc.timeline_id,
1694 1018 : owner.desc.key_range.clone(),
1695 1018 : owner.desc.lsn_range.clone(),
1696 1018 : ));
1697 1018 : delta_layer::DeltaLayerInner::load(
1698 1018 : &owner.path,
1699 1018 : summary,
1700 1018 : Some(owner.conf.max_vectored_read_bytes),
1701 1018 : ctx,
1702 1018 : )
1703 1108 : .await
1704 1108 : .map(|res| res.map(LayerKind::Delta))
1705 1108 : } else {
1706 1108 : let lsn = owner.desc.image_layer_lsn();
1707 90 : let summary = Some(image_layer::Summary::expected(
1708 90 : owner.desc.tenant_shard_id.tenant_id,
1709 90 : owner.desc.timeline_id,
1710 90 : owner.desc.key_range.clone(),
1711 90 : lsn,
1712 90 : ));
1713 90 : image_layer::ImageLayerInner::load(
1714 90 : &owner.path,
1715 90 : lsn,
1716 90 : summary,
1717 90 : Some(owner.conf.max_vectored_read_bytes),
1718 90 : ctx,
1719 90 : )
1720 1108 : .await
1721 1108 : .map(|res| res.map(LayerKind::Image))
1722 1108 : };
1723 1108 :
1724 1108 : match res {
1725 1108 : Ok(Ok(layer)) => Ok(Ok(layer)),
1726 1108 : Ok(Err(transient)) => Err(transient),
1727 1108 : Err(permanent) => {
1728 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1729 0 : // TODO(#5815): we are not logging all errors, so temporarily log them **once**
1730 0 : // here as well
1731 0 : let permanent = permanent.context("load layer");
1732 0 : tracing::error!("layer loading failed permanently: {permanent:#}");
1733 1108 : Ok(Err(permanent))
1734 1108 : }
1735 1108 : }
1736 1108 : };
1737 212230 : self.kind
1738 212230 : .get_or_try_init(init)
1739 : // return transient errors using `?`
1740 1118 : .await?
1741 212230 : .as_ref()
1742 212230 : .map_err(|e| {
1743 0 : // errors are not clonabled, cannot but stringify
1744 0 : // test_broken_timeline matches this string
1745 0 : anyhow::anyhow!("layer loading failed: {e:#}")
1746 212230 : })
1747 212230 : }
1748 :
1749 211502 : async fn get_value_reconstruct_data(
1750 211502 : &self,
1751 211502 : key: Key,
1752 211502 : lsn_range: Range<Lsn>,
1753 211502 : reconstruct_data: &mut ValueReconstructState,
1754 211502 : owner: &Arc<LayerInner>,
1755 211502 : ctx: &RequestContext,
1756 211502 : ) -> anyhow::Result<ValueReconstructResult> {
1757 211502 : use LayerKind::*;
1758 211502 :
1759 211502 : match self.get(owner, ctx).await? {
1760 204429 : Delta(d) => {
1761 204429 : d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
1762 28881 : .await
1763 : }
1764 7073 : Image(i) => {
1765 7073 : i.get_value_reconstruct_data(key, reconstruct_data, ctx)
1766 734 : .await
1767 : }
1768 : }
1769 211502 : }
1770 :
1771 226 : async fn get_values_reconstruct_data(
1772 226 : &self,
1773 226 : keyspace: KeySpace,
1774 226 : lsn_range: Range<Lsn>,
1775 226 : reconstruct_data: &mut ValuesReconstructState,
1776 226 : owner: &Arc<LayerInner>,
1777 226 : ctx: &RequestContext,
1778 226 : ) -> Result<(), GetVectoredError> {
1779 226 : use LayerKind::*;
1780 226 :
1781 226 : match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
1782 152 : Delta(d) => {
1783 152 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1784 10101 : .await
1785 : }
1786 74 : Image(i) => {
1787 74 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1788 1230 : .await
1789 : }
1790 : }
1791 226 : }
1792 :
1793 0 : async fn load_key_values(
1794 0 : &self,
1795 0 : owner: &Arc<LayerInner>,
1796 0 : ctx: &RequestContext,
1797 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
1798 0 : use LayerKind::*;
1799 0 :
1800 0 : match self.get(owner, ctx).await? {
1801 0 : Delta(d) => d.load_key_values(ctx).await,
1802 0 : Image(i) => i.load_key_values(ctx).await,
1803 : }
1804 0 : }
1805 :
1806 4 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1807 4 : use LayerKind::*;
1808 4 : match self.get(owner, ctx).await? {
1809 4 : Delta(d) => d.dump(ctx).await?,
1810 0 : Image(i) => i.dump(ctx).await?,
1811 : }
1812 :
1813 4 : Ok(())
1814 4 : }
1815 : }
1816 :
1817 : /// Wrapper around an actual layer implementation.
1818 : #[derive(Debug)]
1819 : enum LayerKind {
1820 : Delta(delta_layer::DeltaLayerInner),
1821 : Image(image_layer::ImageLayerInner),
1822 : }
1823 :
1824 : /// Guard for forcing a layer be resident while it exists.
1825 : #[derive(Clone)]
1826 : pub(crate) struct ResidentLayer {
1827 : owner: Layer,
1828 : downloaded: Arc<DownloadedLayer>,
1829 : }
1830 :
1831 : impl std::fmt::Display for ResidentLayer {
1832 1886 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1833 1886 : write!(f, "{}", self.owner)
1834 1886 : }
1835 : }
1836 :
1837 : impl std::fmt::Debug for ResidentLayer {
1838 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1839 0 : write!(f, "{}", self.owner)
1840 0 : }
1841 : }
1842 :
1843 : impl ResidentLayer {
1844 : /// Release the eviction guard, converting back into a plain [`Layer`].
1845 : ///
1846 : /// You can access the [`Layer`] also by using `as_ref`.
1847 420 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1848 420 : self.into()
1849 420 : }
1850 :
1851 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1852 804 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1853 : pub(crate) async fn load_keys<'a>(
1854 : &'a self,
1855 : ctx: &RequestContext,
1856 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1857 : use LayerKind::*;
1858 :
1859 : let owner = &self.owner.0;
1860 : match self.downloaded.get(owner, ctx).await? {
1861 : Delta(ref d) => {
1862 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1863 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1864 : // while it's being held.
1865 : owner
1866 : .access_stats
1867 : .record_access(LayerAccessKind::KeyIter, ctx);
1868 :
1869 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1870 : .await
1871 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1872 : }
1873 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1874 : }
1875 : }
1876 :
1877 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1878 : /// the provided writer. Return the number of keys written.
1879 16 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1880 : pub(crate) async fn filter<'a>(
1881 : &'a self,
1882 : shard_identity: &ShardIdentity,
1883 : writer: &mut ImageLayerWriter,
1884 : ctx: &RequestContext,
1885 : ) -> anyhow::Result<usize> {
1886 : use LayerKind::*;
1887 :
1888 : match self.downloaded.get(&self.owner.0, ctx).await? {
1889 : Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
1890 : Image(i) => i.filter(shard_identity, writer, ctx).await,
1891 : }
1892 : }
1893 :
1894 : /// Returns the amount of keys and values written to the writer.
1895 10 : pub(crate) async fn copy_delta_prefix(
1896 10 : &self,
1897 10 : writer: &mut super::delta_layer::DeltaLayerWriter,
1898 10 : until: Lsn,
1899 10 : ctx: &RequestContext,
1900 10 : ) -> anyhow::Result<usize> {
1901 10 : use LayerKind::*;
1902 10 :
1903 10 : let owner = &self.owner.0;
1904 10 :
1905 10 : match self.downloaded.get(owner, ctx).await? {
1906 10 : Delta(ref d) => d
1907 10 : .copy_prefix(writer, until, ctx)
1908 13 : .await
1909 10 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1910 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1911 : }
1912 10 : }
1913 :
1914 1579 : pub(crate) fn local_path(&self) -> &Utf8Path {
1915 1579 : &self.owner.0.path
1916 1579 : }
1917 :
1918 1504 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1919 1504 : self.owner.metadata()
1920 1504 : }
1921 :
1922 : /// Cast the layer to a delta, return an error if it is an image layer.
1923 68 : pub(crate) async fn get_as_delta(
1924 68 : &self,
1925 68 : ctx: &RequestContext,
1926 68 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1927 68 : use LayerKind::*;
1928 68 : match self.downloaded.get(&self.owner.0, ctx).await? {
1929 68 : Delta(ref d) => Ok(d),
1930 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1931 : }
1932 68 : }
1933 :
1934 : /// Cast the layer to an image, return an error if it is a delta layer.
1935 10 : pub(crate) async fn get_as_image(
1936 10 : &self,
1937 10 : ctx: &RequestContext,
1938 10 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1939 10 : use LayerKind::*;
1940 10 : match self.downloaded.get(&self.owner.0, ctx).await? {
1941 10 : Image(ref d) => Ok(d),
1942 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1943 : }
1944 10 : }
1945 : }
1946 :
1947 : impl AsLayerDesc for ResidentLayer {
1948 5305 : fn layer_desc(&self) -> &PersistentLayerDesc {
1949 5305 : self.owner.layer_desc()
1950 5305 : }
1951 : }
1952 :
1953 : impl AsRef<Layer> for ResidentLayer {
1954 1856 : fn as_ref(&self) -> &Layer {
1955 1856 : &self.owner
1956 1856 : }
1957 : }
1958 :
1959 : /// Drop the eviction guard.
1960 : impl From<ResidentLayer> for Layer {
1961 420 : fn from(value: ResidentLayer) -> Self {
1962 420 : value.owner
1963 420 : }
1964 : }
1965 :
1966 : use metrics::IntCounter;
1967 :
1968 : pub(crate) struct LayerImplMetrics {
1969 : started_evictions: IntCounter,
1970 : completed_evictions: IntCounter,
1971 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1972 :
1973 : started_deletes: IntCounter,
1974 : completed_deletes: IntCounter,
1975 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1976 :
1977 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1978 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1979 : redownload_after: metrics::Histogram,
1980 : time_to_evict: metrics::Histogram,
1981 : }
1982 :
1983 : impl Default for LayerImplMetrics {
1984 36 : fn default() -> Self {
1985 36 : use enum_map::Enum;
1986 36 :
1987 36 : // reminder: these will be pageserver_layer_* with "_total" suffix
1988 36 :
1989 36 : let started_evictions = metrics::register_int_counter!(
1990 : "pageserver_layer_started_evictions",
1991 : "Evictions started in the Layer implementation"
1992 : )
1993 36 : .unwrap();
1994 36 : let completed_evictions = metrics::register_int_counter!(
1995 : "pageserver_layer_completed_evictions",
1996 : "Evictions completed in the Layer implementation"
1997 : )
1998 36 : .unwrap();
1999 36 :
2000 36 : let cancelled_evictions = metrics::register_int_counter_vec!(
2001 : "pageserver_layer_cancelled_evictions_count",
2002 : "Different reasons for evictions to have been cancelled or failed",
2003 : &["reason"]
2004 : )
2005 36 : .unwrap();
2006 36 :
2007 324 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2008 324 : let reason = EvictionCancelled::from_usize(i);
2009 324 : let s = reason.as_str();
2010 324 : cancelled_evictions.with_label_values(&[s])
2011 324 : }));
2012 36 :
2013 36 : let started_deletes = metrics::register_int_counter!(
2014 : "pageserver_layer_started_deletes",
2015 : "Deletions on drop pending in the Layer implementation"
2016 : )
2017 36 : .unwrap();
2018 36 : let completed_deletes = metrics::register_int_counter!(
2019 : "pageserver_layer_completed_deletes",
2020 : "Deletions on drop completed in the Layer implementation"
2021 : )
2022 36 : .unwrap();
2023 36 :
2024 36 : let failed_deletes = metrics::register_int_counter_vec!(
2025 : "pageserver_layer_failed_deletes_count",
2026 : "Different reasons for deletions on drop to have failed",
2027 : &["reason"]
2028 : )
2029 36 : .unwrap();
2030 36 :
2031 72 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2032 72 : let reason = DeleteFailed::from_usize(i);
2033 72 : let s = reason.as_str();
2034 72 : failed_deletes.with_label_values(&[s])
2035 72 : }));
2036 36 :
2037 36 : let rare_counters = metrics::register_int_counter_vec!(
2038 : "pageserver_layer_assumed_rare_count",
2039 : "Times unexpected or assumed rare event happened",
2040 : &["event"]
2041 : )
2042 36 : .unwrap();
2043 36 :
2044 252 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2045 252 : let event = RareEvent::from_usize(i);
2046 252 : let s = event.as_str();
2047 252 : rare_counters.with_label_values(&[s])
2048 252 : }));
2049 36 :
2050 36 : let inits_cancelled = metrics::register_int_counter!(
2051 : "pageserver_layer_inits_cancelled_count",
2052 : "Times Layer initialization was cancelled",
2053 : )
2054 36 : .unwrap();
2055 36 :
2056 36 : let redownload_after = {
2057 36 : let minute = 60.0;
2058 36 : let hour = 60.0 * minute;
2059 : metrics::register_histogram!(
2060 : "pageserver_layer_redownloaded_after",
2061 : "Time between evicting and re-downloading.",
2062 : vec![
2063 : 10.0,
2064 : 30.0,
2065 : minute,
2066 : 5.0 * minute,
2067 : 15.0 * minute,
2068 : 30.0 * minute,
2069 : hour,
2070 : 12.0 * hour,
2071 : ]
2072 : )
2073 36 : .unwrap()
2074 36 : };
2075 36 :
2076 36 : let time_to_evict = metrics::register_histogram!(
2077 : "pageserver_layer_eviction_held_permit_seconds",
2078 : "Time eviction held the permit.",
2079 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2080 : )
2081 36 : .unwrap();
2082 36 :
2083 36 : Self {
2084 36 : started_evictions,
2085 36 : completed_evictions,
2086 36 : cancelled_evictions,
2087 36 :
2088 36 : started_deletes,
2089 36 : completed_deletes,
2090 36 : failed_deletes,
2091 36 :
2092 36 : rare_counters,
2093 36 : inits_cancelled,
2094 36 : redownload_after,
2095 36 : time_to_evict,
2096 36 : }
2097 36 : }
2098 : }
2099 :
2100 : impl LayerImplMetrics {
2101 26 : fn inc_started_evictions(&self) {
2102 26 : self.started_evictions.inc();
2103 26 : }
2104 18 : fn inc_completed_evictions(&self) {
2105 18 : self.completed_evictions.inc();
2106 18 : }
2107 8 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2108 8 : self.cancelled_evictions[reason].inc()
2109 8 : }
2110 :
2111 434 : fn inc_started_deletes(&self) {
2112 434 : self.started_deletes.inc();
2113 434 : }
2114 434 : fn inc_completed_deletes(&self) {
2115 434 : self.completed_deletes.inc();
2116 434 : }
2117 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2118 0 : self.failed_deletes[reason].inc();
2119 0 : }
2120 :
2121 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2122 : /// attempt regardless of failure to delete local file.
2123 0 : fn inc_delete_removes_failed(&self) {
2124 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2125 0 : }
2126 :
2127 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2128 : /// the single caller which can start the download, so use this counter to separte them.
2129 0 : fn inc_init_completed_without_requester(&self) {
2130 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2131 0 : }
2132 :
2133 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2134 0 : fn inc_download_failed_without_requester(&self) {
2135 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2136 0 : }
2137 :
2138 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2139 : ///
2140 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2141 : /// Option.
2142 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2143 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2144 0 : }
2145 :
2146 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2147 : /// running with remote storage.
2148 6 : fn inc_init_needed_no_download(&self) {
2149 6 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2150 6 : }
2151 :
2152 : /// Expected rare because all layer files should be readable and good
2153 0 : fn inc_permanent_loading_failures(&self) {
2154 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2155 0 : }
2156 :
2157 0 : fn inc_init_cancelled(&self) {
2158 0 : self.inits_cancelled.inc()
2159 0 : }
2160 :
2161 6 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2162 6 : self.redownload_after.observe(duration.as_secs_f64())
2163 6 : }
2164 :
2165 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2166 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2167 : /// from other evictions, so this could have noise as well.
2168 0 : fn inc_evicted_with_waiters(&self) {
2169 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2170 0 : }
2171 :
2172 : /// Recorded at least initially as the permit is now acquired in async context before
2173 : /// spawn_blocking action.
2174 18 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2175 18 : self.time_to_evict.observe(duration.as_secs_f64())
2176 18 : }
2177 : }
2178 :
2179 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2180 : enum EvictionCancelled {
2181 : LayerGone,
2182 : TimelineGone,
2183 : VersionCheckFailed,
2184 : FileNotFound,
2185 : RemoveFailed,
2186 : AlreadyReinitialized,
2187 : /// Not evicted because of a pending reinitialization
2188 : LostToDownload,
2189 : /// After eviction, there was a new layer access which cancelled the eviction.
2190 : UpgradedBackOnAccess,
2191 : UnexpectedEvictedState,
2192 : }
2193 :
2194 : impl EvictionCancelled {
2195 324 : fn as_str(&self) -> &'static str {
2196 324 : match self {
2197 36 : EvictionCancelled::LayerGone => "layer_gone",
2198 36 : EvictionCancelled::TimelineGone => "timeline_gone",
2199 36 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2200 36 : EvictionCancelled::FileNotFound => "file_not_found",
2201 36 : EvictionCancelled::RemoveFailed => "remove_failed",
2202 36 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2203 36 : EvictionCancelled::LostToDownload => "lost_to_download",
2204 36 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2205 36 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2206 : }
2207 324 : }
2208 : }
2209 :
2210 : #[derive(enum_map::Enum)]
2211 : enum DeleteFailed {
2212 : TimelineGone,
2213 : DeleteSchedulingFailed,
2214 : }
2215 :
2216 : impl DeleteFailed {
2217 72 : fn as_str(&self) -> &'static str {
2218 72 : match self {
2219 36 : DeleteFailed::TimelineGone => "timeline_gone",
2220 36 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2221 : }
2222 72 : }
2223 : }
2224 :
2225 : #[derive(enum_map::Enum)]
2226 : enum RareEvent {
2227 : RemoveOnDropFailed,
2228 : InitCompletedWithoutRequester,
2229 : DownloadFailedWithoutRequester,
2230 : UpgradedWantedEvicted,
2231 : InitWithoutDownload,
2232 : PermanentLoadingFailure,
2233 : EvictedWithWaiters,
2234 : }
2235 :
2236 : impl RareEvent {
2237 252 : fn as_str(&self) -> &'static str {
2238 252 : use RareEvent::*;
2239 252 :
2240 252 : match self {
2241 36 : RemoveOnDropFailed => "remove_on_drop_failed",
2242 36 : InitCompletedWithoutRequester => "init_completed_without",
2243 36 : DownloadFailedWithoutRequester => "download_failed_without",
2244 36 : UpgradedWantedEvicted => "raced_wanted_evicted",
2245 36 : InitWithoutDownload => "init_needed_no_download",
2246 36 : PermanentLoadingFailure => "permanent_loading_failure",
2247 36 : EvictedWithWaiters => "evicted_with_waiters",
2248 : }
2249 252 : }
2250 : }
2251 :
2252 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2253 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|