Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use pageserver_api::keyspace::KeySpace;
4 : use pageserver_api::models::HistoricLayerInfo;
5 : use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
6 : use std::ops::Range;
7 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 : use std::sync::{Arc, Weak};
9 : use std::time::{Duration, SystemTime};
10 : use tracing::Instrument;
11 : use utils::id::TimelineId;
12 : use utils::lsn::Lsn;
13 : use utils::sync::{gate, heavier_once_cell};
14 :
15 : use crate::config::PageServerConf;
16 : use crate::context::{DownloadBehavior, RequestContext};
17 : use crate::repository::Key;
18 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
19 : use crate::task_mgr::TaskKind;
20 : use crate::tenant::timeline::{CompactionError, GetVectoredError};
21 : use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
22 :
23 : use super::delta_layer::{self, DeltaEntry};
24 : use super::image_layer::{self};
25 : use super::{
26 : AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
27 : LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
28 : };
29 :
30 : use utils::generation::Generation;
31 :
32 : #[cfg(test)]
33 : mod tests;
34 :
35 : #[cfg(test)]
36 : mod failpoints;
37 :
38 : pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
39 :
40 : /// A Layer contains all data in a "rectangle" consisting of a range of keys and
41 : /// range of LSNs.
42 : ///
43 : /// There are two kinds of layers, in-memory and on-disk layers. In-memory
44 : /// layers are used to ingest incoming WAL, and provide fast access to the
45 : /// recent page versions. On-disk layers are stored as files on disk, and are
46 : /// immutable. This type represents the on-disk kind while in-memory kind are represented by
47 : /// [`InMemoryLayer`].
48 : ///
49 : /// Furthermore, there are two kinds of on-disk layers: delta and image layers.
50 : /// A delta layer contains all modifications within a range of LSNs and keys.
51 : /// An image layer is a snapshot of all the data in a key-range, at a single
52 : /// LSN.
53 : ///
54 : /// This type models the on-disk layers, which can be evicted and on-demand downloaded. As a
55 : /// general goal, read accesses should always win eviction and eviction should not wait for
56 : /// download.
57 : ///
58 : /// ### State transitions
59 : ///
60 : /// The internal state of `Layer` is composed of most importantly the on-filesystem state and the
61 : /// [`ResidentOrWantedEvicted`] enum. On-filesystem state can be either present (fully downloaded,
62 : /// right size) or deleted.
63 : ///
64 : /// Reads will always win requests to evict until `wait_for_turn_and_evict` has acquired the
65 : /// `heavier_once_cell::InitPermit` and has started to `evict_blocking`. Before the
66 : /// `heavier_once_cell::InitPermit` has been acquired, any read request
67 : /// (`get_or_maybe_download`) can "re-initialize" using the existing downloaded file and thus
68 : /// cancelling the eviction.
69 : ///
70 : /// ```text
71 : /// +-----------------+ get_or_maybe_download +--------------------------------+
72 : /// | not initialized |--------------------------->| Resident(Arc<DownloadedLayer>) |
73 : /// | ENOENT | /->| |
74 : /// +-----------------+ | +--------------------------------+
75 : /// ^ | | ^
76 : /// | get_or_maybe_download | | | get_or_maybe_download, either:
77 : /// evict_blocking | /-------------------------/ | | - upgrade weak to strong
78 : /// | | | | - re-initialize without download
79 : /// | | evict_and_wait | |
80 : /// +-----------------+ v |
81 : /// | not initialized | on_downloaded_layer_drop +--------------------------------------+
82 : /// | file is present |<---------------------------| WantedEvicted(Weak<DownloadedLayer>) |
83 : /// +-----------------+ +--------------------------------------+
84 : /// ```
85 : ///
86 : /// ### Unsupported
87 : ///
88 : /// - Evicting by the operator deleting files from the filesystem
89 : ///
90 : /// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer
91 : #[derive(Clone)]
92 : pub(crate) struct Layer(Arc<LayerInner>);
93 :
94 : impl std::fmt::Display for Layer {
95 5754 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 5754 : write!(
97 5754 : f,
98 5754 : "{}{}",
99 5754 : self.layer_desc().short_id(),
100 5754 : self.0.generation.get_suffix()
101 5754 : )
102 5754 : }
103 : }
104 :
105 : impl std::fmt::Debug for Layer {
106 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 0 : write!(f, "{}", self)
108 0 : }
109 : }
110 :
111 : impl AsLayerDesc for Layer {
112 1512608 : fn layer_desc(&self) -> &PersistentLayerDesc {
113 1512608 : self.0.layer_desc()
114 1512608 : }
115 : }
116 :
117 : impl PartialEq for Layer {
118 9 : fn eq(&self, other: &Self) -> bool {
119 9 : Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
120 9 : }
121 : }
122 :
123 5124 : pub(crate) fn local_layer_path(
124 5124 : conf: &PageServerConf,
125 5124 : tenant_shard_id: &TenantShardId,
126 5124 : timeline_id: &TimelineId,
127 5124 : layer_file_name: &LayerName,
128 5124 : generation: &Generation,
129 5124 : ) -> Utf8PathBuf {
130 5124 : let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
131 5124 :
132 5124 : if generation.is_none() {
133 : // Without a generation, we may only use legacy path style
134 0 : timeline_path.join(layer_file_name.to_string())
135 : } else {
136 5124 : timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
137 : }
138 5124 : }
139 :
140 : impl Layer {
141 : /// Creates a layer value for a file we know to not be resident.
142 0 : pub(crate) fn for_evicted(
143 0 : conf: &'static PageServerConf,
144 0 : timeline: &Arc<Timeline>,
145 0 : file_name: LayerName,
146 0 : metadata: LayerFileMetadata,
147 0 : ) -> Self {
148 0 : let local_path = local_layer_path(
149 0 : conf,
150 0 : &timeline.tenant_shard_id,
151 0 : &timeline.timeline_id,
152 0 : &file_name,
153 0 : &metadata.generation,
154 0 : );
155 0 :
156 0 : let desc = PersistentLayerDesc::from_filename(
157 0 : timeline.tenant_shard_id,
158 0 : timeline.timeline_id,
159 0 : file_name,
160 0 : metadata.file_size,
161 0 : );
162 0 :
163 0 : let owner = Layer(Arc::new(LayerInner::new(
164 0 : conf,
165 0 : timeline,
166 0 : local_path,
167 0 : desc,
168 0 : None,
169 0 : metadata.generation,
170 0 : metadata.shard,
171 0 : )));
172 0 :
173 0 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
174 :
175 0 : owner
176 0 : }
177 :
178 : /// Creates a Layer value for a file we know to be resident in timeline directory.
179 72 : pub(crate) fn for_resident(
180 72 : conf: &'static PageServerConf,
181 72 : timeline: &Arc<Timeline>,
182 72 : local_path: Utf8PathBuf,
183 72 : file_name: LayerName,
184 72 : metadata: LayerFileMetadata,
185 72 : ) -> ResidentLayer {
186 72 : let desc = PersistentLayerDesc::from_filename(
187 72 : timeline.tenant_shard_id,
188 72 : timeline.timeline_id,
189 72 : file_name,
190 72 : metadata.file_size,
191 72 : );
192 72 :
193 72 : let mut resident = None;
194 72 :
195 72 : let owner = Layer(Arc::new_cyclic(|owner| {
196 72 : let inner = Arc::new(DownloadedLayer {
197 72 : owner: owner.clone(),
198 72 : kind: tokio::sync::OnceCell::default(),
199 72 : version: 0,
200 72 : });
201 72 : resident = Some(inner.clone());
202 72 :
203 72 : LayerInner::new(
204 72 : conf,
205 72 : timeline,
206 72 : local_path,
207 72 : desc,
208 72 : Some(inner),
209 72 : metadata.generation,
210 72 : metadata.shard,
211 72 : )
212 72 : }));
213 72 :
214 72 : let downloaded = resident.expect("just initialized");
215 72 :
216 72 : debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
217 :
218 72 : timeline
219 72 : .metrics
220 72 : .resident_physical_size_add(metadata.file_size);
221 72 :
222 72 : ResidentLayer { downloaded, owner }
223 72 : }
224 :
225 : /// Creates a Layer value for freshly written out new layer file by renaming it from a
226 : /// temporary path.
227 5100 : pub(crate) fn finish_creating(
228 5100 : conf: &'static PageServerConf,
229 5100 : timeline: &Arc<Timeline>,
230 5100 : desc: PersistentLayerDesc,
231 5100 : temp_path: &Utf8Path,
232 5100 : ) -> anyhow::Result<ResidentLayer> {
233 5100 : let mut resident = None;
234 5100 :
235 5100 : let owner = Layer(Arc::new_cyclic(|owner| {
236 5100 : let inner = Arc::new(DownloadedLayer {
237 5100 : owner: owner.clone(),
238 5100 : kind: tokio::sync::OnceCell::default(),
239 5100 : version: 0,
240 5100 : });
241 5100 : resident = Some(inner.clone());
242 5100 :
243 5100 : let local_path = local_layer_path(
244 5100 : conf,
245 5100 : &timeline.tenant_shard_id,
246 5100 : &timeline.timeline_id,
247 5100 : &desc.layer_name(),
248 5100 : &timeline.generation,
249 5100 : );
250 5100 :
251 5100 : LayerInner::new(
252 5100 : conf,
253 5100 : timeline,
254 5100 : local_path,
255 5100 : desc,
256 5100 : Some(inner),
257 5100 : timeline.generation,
258 5100 : timeline.get_shard_index(),
259 5100 : )
260 5100 : }));
261 5100 :
262 5100 : let downloaded = resident.expect("just initialized");
263 5100 :
264 5100 : // We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
265 5100 : // TODO: this leaves the temp file in place if the rename fails, risking us running
266 5100 : // out of space. Should we clean it up here or does the calling context deal with this?
267 5100 : utils::fs_ext::rename_noreplace(temp_path.as_std_path(), owner.local_path().as_std_path())
268 5100 : .with_context(|| format!("rename temporary file as correct path for {owner}"))?;
269 :
270 5100 : Ok(ResidentLayer { downloaded, owner })
271 5100 : }
272 :
273 : /// Requests the layer to be evicted and waits for this to be done.
274 : ///
275 : /// If the file is not resident, an [`EvictionError::NotFound`] is returned.
276 : ///
277 : /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is
278 : /// re-downloaded, [`EvictionError::Downloaded`] is returned.
279 : ///
280 : /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction
281 : /// will happen regardless the future returned by this method completing unless there is a
282 : /// read access before eviction gets to complete.
283 : ///
284 : /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
285 : /// of download-evict cycle on retry.
286 108 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
287 146 : self.0.evict_and_wait(timeout).await
288 96 : }
289 :
290 : /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
291 : /// then.
292 : ///
293 : /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
294 : /// This means that the unlinking by [gc] or [compaction] must have happened strictly before
295 : /// the value this is called on gets dropped.
296 : ///
297 : /// This is ensured by both of those methods accepting references to Layer.
298 : ///
299 : /// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
300 : /// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
301 1410 : pub(crate) fn delete_on_drop(&self) {
302 1410 : self.0.delete_on_drop();
303 1410 : }
304 :
305 638797 : pub(crate) async fn get_values_reconstruct_data(
306 638797 : &self,
307 638797 : keyspace: KeySpace,
308 638797 : lsn_range: Range<Lsn>,
309 638797 : reconstruct_data: &mut ValuesReconstructState,
310 638797 : ctx: &RequestContext,
311 638797 : ) -> Result<(), GetVectoredError> {
312 638797 : let layer = self
313 638797 : .0
314 638797 : .get_or_maybe_download(true, Some(ctx))
315 10 : .await
316 638797 : .map_err(|err| match err {
317 : DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
318 0 : GetVectoredError::Cancelled
319 : }
320 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
321 638797 : })?;
322 :
323 638797 : self.record_access(ctx);
324 638797 :
325 638797 : layer
326 638797 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
327 638797 : .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
328 286099 : .await
329 638797 : .map_err(|err| match err {
330 0 : GetVectoredError::Other(err) => GetVectoredError::Other(
331 0 : err.context(format!("get_values_reconstruct_data for layer {self}")),
332 0 : ),
333 0 : err => err,
334 638797 : })
335 638797 : }
336 :
337 : /// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
338 : #[allow(dead_code)]
339 0 : pub(crate) async fn load_key_values(
340 0 : &self,
341 0 : ctx: &RequestContext,
342 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
343 0 : let layer = self
344 0 : .0
345 0 : .get_or_maybe_download(true, Some(ctx))
346 0 : .await
347 0 : .map_err(|err| match err {
348 0 : DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
349 0 : other => GetVectoredError::Other(anyhow::anyhow!(other)),
350 0 : })?;
351 0 : layer.load_key_values(&self.0, ctx).await
352 0 : }
353 :
354 : /// Download the layer if evicted.
355 : ///
356 : /// Will not error when the layer is already downloaded.
357 0 : pub(crate) async fn download(&self) -> anyhow::Result<()> {
358 0 : self.0.get_or_maybe_download(true, None).await?;
359 0 : Ok(())
360 0 : }
361 :
362 : /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
363 : /// while the guard exists.
364 : ///
365 : /// Returns None if the layer is currently evicted or becoming evicted.
366 : #[cfg(test)]
367 60 : pub(crate) async fn keep_resident(&self) -> Option<ResidentLayer> {
368 60 : let downloaded = self.0.inner.get().and_then(|rowe| rowe.get())?;
369 :
370 42 : Some(ResidentLayer {
371 42 : downloaded,
372 42 : owner: self.clone(),
373 42 : })
374 60 : }
375 :
376 : /// Weak indicator of is the layer resident or not. Good enough for eviction, which can deal
377 : /// with `EvictionError::NotFound`.
378 : ///
379 : /// Returns `true` if this layer might be resident, or `false`, if it most likely evicted or
380 : /// will be unless a read happens soon.
381 159 : pub(crate) fn is_likely_resident(&self) -> bool {
382 159 : self.0
383 159 : .inner
384 159 : .get()
385 159 : .map(|rowe| rowe.is_likely_resident())
386 159 : .unwrap_or(false)
387 159 : }
388 :
389 : /// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
390 1488 : pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
391 1488 : let downloaded = self.0.get_or_maybe_download(true, None).await?;
392 :
393 1488 : Ok(ResidentLayer {
394 1488 : downloaded,
395 1488 : owner: self.clone(),
396 1488 : })
397 1488 : }
398 :
399 0 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
400 0 : self.0.info(reset)
401 0 : }
402 :
403 24 : pub(crate) fn latest_activity(&self) -> SystemTime {
404 24 : self.0.access_stats.latest_activity()
405 24 : }
406 :
407 30 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
408 30 : self.0.access_stats.visibility()
409 30 : }
410 :
411 5106 : pub(crate) fn local_path(&self) -> &Utf8Path {
412 5106 : &self.0.path
413 5106 : }
414 :
415 5802 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
416 5802 : self.0.metadata()
417 5802 : }
418 :
419 0 : pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
420 0 : self.0
421 0 : .timeline
422 0 : .upgrade()
423 0 : .map(|timeline| timeline.timeline_id)
424 0 : }
425 :
426 : /// Traditional debug dumping facility
427 : #[allow(unused)]
428 12 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
429 12 : self.0.desc.dump();
430 12 :
431 12 : if verbose {
432 : // for now, unconditionally download everything, even if that might not be wanted.
433 12 : let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
434 24 : l.dump(&self.0, ctx).await?
435 0 : }
436 :
437 12 : Ok(())
438 12 : }
439 :
440 : /// Waits until this layer has been dropped (and if needed, local file deletion and remote
441 : /// deletion scheduling has completed).
442 : ///
443 : /// Does not start local deletion, use [`Self::delete_on_drop`] for that
444 : /// separatedly.
445 : #[cfg(any(feature = "testing", test))]
446 6 : pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
447 6 : let mut rx = self.0.status.as_ref().unwrap().subscribe();
448 :
449 6 : async move {
450 : loop {
451 18 : if rx.changed().await.is_err() {
452 6 : break;
453 0 : }
454 : }
455 6 : }
456 6 : }
457 :
458 640003 : fn record_access(&self, ctx: &RequestContext) {
459 640003 : if self.0.access_stats.record_access(ctx) {
460 : // Visibility was modified to Visible
461 0 : tracing::info!(
462 0 : "Layer {} became visible as a result of access",
463 0 : self.0.desc.key()
464 : );
465 0 : if let Some(tl) = self.0.timeline.upgrade() {
466 0 : tl.metrics
467 0 : .visible_physical_size_gauge
468 0 : .add(self.0.desc.file_size)
469 0 : }
470 640003 : }
471 640003 : }
472 :
473 8070 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
474 8070 : let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
475 8070 : use LayerVisibilityHint::*;
476 8070 : match (old_visibility, visibility) {
477 : (Visible, Covered) => {
478 : // Subtract this layer's contribution to the visible size metric
479 108 : if let Some(tl) = self.0.timeline.upgrade() {
480 108 : debug_assert!(
481 108 : tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
482 : );
483 108 : tl.metrics
484 108 : .visible_physical_size_gauge
485 108 : .sub(self.0.desc.file_size)
486 0 : }
487 : }
488 : (Covered, Visible) => {
489 : // Add this layer's contribution to the visible size metric
490 0 : if let Some(tl) = self.0.timeline.upgrade() {
491 0 : tl.metrics
492 0 : .visible_physical_size_gauge
493 0 : .add(self.0.desc.file_size)
494 0 : }
495 : }
496 7962 : (Covered, Covered) | (Visible, Visible) => {
497 7962 : // no change
498 7962 : }
499 : }
500 8070 : }
501 : }
502 :
503 : /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
504 : ///
505 : /// However when we want something evicted, we cannot evict it right away as there might be current
506 : /// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
507 : /// read with [`Layer::get_values_reconstruct_data`].
508 : ///
509 : /// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
510 : #[derive(Debug)]
511 : enum ResidentOrWantedEvicted {
512 : Resident(Arc<DownloadedLayer>),
513 : WantedEvicted(Weak<DownloadedLayer>, usize),
514 : }
515 :
516 : impl ResidentOrWantedEvicted {
517 : /// Non-mutating access to the a DownloadedLayer, if possible.
518 : ///
519 : /// This is not used on the read path (anything that calls
520 : /// [`LayerInner::get_or_maybe_download`]) because it was decided that reads always win
521 : /// evictions, and part of that winning is using [`ResidentOrWantedEvicted::get_and_upgrade`].
522 : #[cfg(test)]
523 42 : fn get(&self) -> Option<Arc<DownloadedLayer>> {
524 42 : match self {
525 42 : ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
526 0 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.upgrade(),
527 : }
528 42 : }
529 :
530 : /// Best-effort query for residency right now, not as strong guarantee as receiving a strong
531 : /// reference from `ResidentOrWantedEvicted::get`.
532 141 : fn is_likely_resident(&self) -> bool {
533 141 : match self {
534 123 : ResidentOrWantedEvicted::Resident(_) => true,
535 18 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => weak.strong_count() > 0,
536 : }
537 141 : }
538 :
539 : /// Upgrades any weak to strong if possible.
540 : ///
541 : /// Returns a strong reference if possible, along with a boolean telling if an upgrade
542 : /// happened.
543 640309 : fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
544 640309 : match self {
545 640285 : ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
546 24 : ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
547 0 : Some(strong) => {
548 0 : LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
549 0 :
550 0 : *self = ResidentOrWantedEvicted::Resident(strong.clone());
551 0 :
552 0 : Some((strong, true))
553 : }
554 24 : None => None,
555 : },
556 : }
557 640309 : }
558 :
559 : /// When eviction is first requested, drop down to holding a [`Weak`].
560 : ///
561 : /// Returns `Some` if this was the first time eviction was requested. Care should be taken to
562 : /// drop the possibly last strong reference outside of the mutex of
563 : /// [`heavier_once_cell::OnceCell`].
564 90 : fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
565 90 : match self {
566 78 : ResidentOrWantedEvicted::Resident(strong) => {
567 78 : let weak = Arc::downgrade(strong);
568 78 : let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
569 78 : std::mem::swap(self, &mut temp);
570 78 : match temp {
571 78 : ResidentOrWantedEvicted::Resident(strong) => Some(strong),
572 0 : ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
573 : }
574 : }
575 12 : ResidentOrWantedEvicted::WantedEvicted(..) => None,
576 : }
577 90 : }
578 : }
579 :
580 : struct LayerInner {
581 : /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
582 : /// [`Self::path`].
583 : conf: &'static PageServerConf,
584 :
585 : /// Full path to the file; unclear if this should exist anymore.
586 : path: Utf8PathBuf,
587 :
588 : desc: PersistentLayerDesc,
589 :
590 : /// Timeline access is needed for remote timeline client and metrics.
591 : ///
592 : /// There should not be an access to timeline for any reason without entering the
593 : /// [`Timeline::gate`] at the same time.
594 : timeline: Weak<Timeline>,
595 :
596 : access_stats: LayerAccessStats,
597 :
598 : /// This custom OnceCell is backed by std mutex, but only held for short time periods.
599 : ///
600 : /// Filesystem changes (download, evict) are only done while holding a permit which the
601 : /// `heavier_once_cell` provides.
602 : ///
603 : /// A number of fields in `Layer` are meant to only be updated when holding the InitPermit, but
604 : /// possibly read while not holding it.
605 : inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
606 :
607 : /// Do we want to delete locally and remotely this when `LayerInner` is dropped
608 : wanted_deleted: AtomicBool,
609 :
610 : /// Version is to make sure we will only evict a specific initialization of the downloaded file.
611 : ///
612 : /// Incremented for each initialization, stored in `DownloadedLayer::version` or
613 : /// `ResidentOrWantedEvicted::WantedEvicted`.
614 : version: AtomicUsize,
615 :
616 : /// Allow subscribing to when the layer actually gets evicted, a non-cancellable download
617 : /// starts, or completes.
618 : ///
619 : /// Updates must only be posted while holding the InitPermit or the heavier_once_cell::Guard.
620 : /// Holding the InitPermit is the only time we can do state transitions, but we also need to
621 : /// cancel a pending eviction on upgrading a [`ResidentOrWantedEvicted::WantedEvicted`] back to
622 : /// [`ResidentOrWantedEvicted::Resident`] on access.
623 : ///
624 : /// The sender is wrapped in an Option to facilitate moving it out on [`LayerInner::drop`].
625 : status: Option<tokio::sync::watch::Sender<Status>>,
626 :
627 : /// Counter for exponential backoff with the download.
628 : ///
629 : /// This is atomic only for the purposes of having additional data only accessed while holding
630 : /// the InitPermit.
631 : consecutive_failures: AtomicUsize,
632 :
633 : /// The generation of this Layer.
634 : ///
635 : /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
636 : /// for created layers from [`Timeline::generation`].
637 : generation: Generation,
638 :
639 : /// The shard of this Layer.
640 : ///
641 : /// For layers created in this process, this will always be the [`ShardIndex`] of the
642 : /// current `ShardIdentity`` (TODO: add link once it's introduced).
643 : ///
644 : /// For loaded layers, this may be some other value if the tenant has undergone
645 : /// a shard split since the layer was originally written.
646 : shard: ShardIndex,
647 :
648 : /// When the Layer was last evicted but has not been downloaded since.
649 : ///
650 : /// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
651 : last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
652 :
653 : #[cfg(test)]
654 : failpoints: std::sync::Mutex<Vec<failpoints::Failpoint>>,
655 : }
656 :
657 : impl std::fmt::Display for LayerInner {
658 90 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659 90 : write!(f, "{}", self.layer_desc().short_id())
660 90 : }
661 : }
662 :
663 : impl AsLayerDesc for LayerInner {
664 1519643 : fn layer_desc(&self) -> &PersistentLayerDesc {
665 1519643 : &self.desc
666 1519643 : }
667 : }
668 :
669 : #[derive(Debug, Clone, Copy)]
670 : enum Status {
671 : Resident,
672 : Evicted,
673 : Downloading,
674 : }
675 :
676 : impl Drop for LayerInner {
677 1725 : fn drop(&mut self) {
678 : // if there was a pending eviction, mark it cancelled here to balance metrics
679 1725 : if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
680 6 : {
681 6 : // eviction has already been started
682 6 : LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
683 6 :
684 6 : // eviction request is intentionally not honored as no one is present to wait for it
685 6 : // and we could be delaying shutdown for nothing.
686 1719 : }
687 :
688 1725 : if let Some(timeline) = self.timeline.upgrade() {
689 : // Only need to decrement metrics if the timeline still exists: otherwise
690 : // it will have already de-registered these metrics via TimelineMetrics::shutdown
691 1677 : if self.desc.is_delta() {
692 1484 : timeline.metrics.layer_count_delta.dec();
693 1484 : timeline.metrics.layer_size_delta.sub(self.desc.file_size);
694 1484 : } else {
695 193 : timeline.metrics.layer_count_image.dec();
696 193 : timeline.metrics.layer_size_image.sub(self.desc.file_size);
697 193 : }
698 :
699 1677 : if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
700 1659 : debug_assert!(
701 1659 : timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
702 : );
703 1659 : timeline
704 1659 : .metrics
705 1659 : .visible_physical_size_gauge
706 1659 : .sub(self.desc.file_size);
707 18 : }
708 48 : }
709 :
710 1725 : if !*self.wanted_deleted.get_mut() {
711 336 : return;
712 1389 : }
713 :
714 1389 : let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
715 :
716 1389 : let path = std::mem::take(&mut self.path);
717 1389 : let file_name = self.layer_desc().layer_name();
718 1389 : let file_size = self.layer_desc().file_size;
719 1389 : let timeline = self.timeline.clone();
720 1389 : let meta = self.metadata();
721 1389 : let status = self.status.take();
722 1389 :
723 1389 : Self::spawn_blocking(move || {
724 1387 : let _g = span.entered();
725 1387 :
726 1387 : // carry this until we are finished for [`Layer::wait_drop`] support
727 1387 : let _status = status;
728 :
729 1387 : let Some(timeline) = timeline.upgrade() else {
730 : // no need to nag that timeline is gone: under normal situation on
731 : // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
732 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
733 0 : return;
734 : };
735 :
736 1387 : let Ok(_guard) = timeline.gate.enter() else {
737 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
738 0 : return;
739 : };
740 :
741 1387 : let removed = match std::fs::remove_file(path) {
742 1381 : Ok(()) => true,
743 6 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
744 6 : // until we no longer do detaches by removing all local files before removing the
745 6 : // tenant from the global map, we will always get these errors even if we knew what
746 6 : // is the latest state.
747 6 : //
748 6 : // we currently do not track the latest state, so we'll also end up here on evicted
749 6 : // layers.
750 6 : false
751 : }
752 0 : Err(e) => {
753 0 : tracing::error!("failed to remove wanted deleted layer: {e}");
754 0 : LAYER_IMPL_METRICS.inc_delete_removes_failed();
755 0 : false
756 : }
757 : };
758 :
759 1387 : if removed {
760 1381 : timeline.metrics.resident_physical_size_sub(file_size);
761 1381 : }
762 1387 : let res = timeline
763 1387 : .remote_client
764 1387 : .schedule_deletion_of_unlinked(vec![(file_name, meta)]);
765 :
766 1387 : if let Err(e) = res {
767 : // test_timeline_deletion_with_files_stuck_in_upload_queue is good at
768 : // demonstrating this deadlock (without spawn_blocking): stop will drop
769 : // queued items, which will have ResidentLayer's, and those drops would try
770 : // to re-entrantly lock the RemoteTimelineClient inner state.
771 0 : if !timeline.is_active() {
772 0 : tracing::info!("scheduling deletion on drop failed: {e:#}");
773 : } else {
774 0 : tracing::warn!("scheduling deletion on drop failed: {e:#}");
775 : }
776 0 : LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
777 1387 : } else {
778 1387 : LAYER_IMPL_METRICS.inc_completed_deletes();
779 1387 : }
780 1389 : });
781 1725 : }
782 : }
783 :
784 : impl LayerInner {
785 : #[allow(clippy::too_many_arguments)]
786 5172 : fn new(
787 5172 : conf: &'static PageServerConf,
788 5172 : timeline: &Arc<Timeline>,
789 5172 : local_path: Utf8PathBuf,
790 5172 : desc: PersistentLayerDesc,
791 5172 : downloaded: Option<Arc<DownloadedLayer>>,
792 5172 : generation: Generation,
793 5172 : shard: ShardIndex,
794 5172 : ) -> Self {
795 5172 : let (inner, version, init_status) = if let Some(inner) = downloaded {
796 5172 : let version = inner.version;
797 5172 : let resident = ResidentOrWantedEvicted::Resident(inner);
798 5172 : (
799 5172 : heavier_once_cell::OnceCell::new(resident),
800 5172 : version,
801 5172 : Status::Resident,
802 5172 : )
803 : } else {
804 0 : (heavier_once_cell::OnceCell::default(), 0, Status::Evicted)
805 : };
806 :
807 : // This object acts as a RAII guard on these metrics: increment on construction
808 5172 : if desc.is_delta() {
809 4260 : timeline.metrics.layer_count_delta.inc();
810 4260 : timeline.metrics.layer_size_delta.add(desc.file_size);
811 4260 : } else {
812 912 : timeline.metrics.layer_count_image.inc();
813 912 : timeline.metrics.layer_size_image.add(desc.file_size);
814 912 : }
815 :
816 : // New layers are visible by default. This metric is later updated on drop or in set_visibility
817 5172 : timeline
818 5172 : .metrics
819 5172 : .visible_physical_size_gauge
820 5172 : .add(desc.file_size);
821 5172 :
822 5172 : LayerInner {
823 5172 : conf,
824 5172 : path: local_path,
825 5172 : desc,
826 5172 : timeline: Arc::downgrade(timeline),
827 5172 : access_stats: Default::default(),
828 5172 : wanted_deleted: AtomicBool::new(false),
829 5172 : inner,
830 5172 : version: AtomicUsize::new(version),
831 5172 : status: Some(tokio::sync::watch::channel(init_status).0),
832 5172 : consecutive_failures: AtomicUsize::new(0),
833 5172 : generation,
834 5172 : shard,
835 5172 : last_evicted_at: std::sync::Mutex::default(),
836 5172 : #[cfg(test)]
837 5172 : failpoints: Default::default(),
838 5172 : }
839 5172 : }
840 :
841 1410 : fn delete_on_drop(&self) {
842 1410 : let res =
843 1410 : self.wanted_deleted
844 1410 : .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
845 1410 :
846 1410 : if res.is_ok() {
847 1398 : LAYER_IMPL_METRICS.inc_started_deletes();
848 1398 : }
849 1410 : }
850 :
851 : /// Cancellation safe, however dropping the future and calling this method again might result
852 : /// in a new attempt to evict OR join the previously started attempt.
853 324 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
854 : pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
855 : let mut rx = self.status.as_ref().unwrap().subscribe();
856 :
857 : {
858 : let current = rx.borrow_and_update();
859 : match &*current {
860 : Status::Resident => {
861 : // we might get lucky and evict this; continue
862 : }
863 : Status::Evicted | Status::Downloading => {
864 : // it is already evicted
865 : return Err(EvictionError::NotFound);
866 : }
867 : }
868 : }
869 :
870 : let strong = {
871 : match self.inner.get() {
872 : Some(mut either) => either.downgrade(),
873 : None => {
874 : // we already have a scheduled eviction, which just has not gotten to run yet.
875 : // it might still race with a read access, but that could also get cancelled,
876 : // so let's say this is not evictable.
877 : return Err(EvictionError::NotFound);
878 : }
879 : }
880 : };
881 :
882 : if strong.is_some() {
883 : // drop the DownloadedLayer outside of the holding the guard
884 : drop(strong);
885 :
886 : // idea here is that only one evicter should ever get to witness a strong reference,
887 : // which means whenever get_or_maybe_download upgrades a weak, it must mark up a
888 : // cancelled eviction and signal us, like it currently does.
889 : //
890 : // a second concurrent evict_and_wait will not see a strong reference.
891 : LAYER_IMPL_METRICS.inc_started_evictions();
892 : }
893 :
894 : let changed = rx.changed();
895 : let changed = tokio::time::timeout(timeout, changed).await;
896 :
897 : let Ok(changed) = changed else {
898 : return Err(EvictionError::Timeout);
899 : };
900 :
901 : let _: () = changed.expect("cannot be closed, because we are holding a strong reference");
902 :
903 : let current = rx.borrow_and_update();
904 :
905 : match &*current {
906 : // the easiest case
907 : Status::Evicted => Ok(()),
908 : // it surely was evicted in between, but then there was a new access now; we can't know
909 : // if it'll succeed so lets just call it evicted
910 : Status::Downloading => Ok(()),
911 : // either the download which was started after eviction completed already, or it was
912 : // never evicted
913 : Status::Resident => Err(EvictionError::Downloaded),
914 : }
915 : }
916 :
917 : /// Cancellation safe.
918 640333 : async fn get_or_maybe_download(
919 640333 : self: &Arc<Self>,
920 640333 : allow_download: bool,
921 640333 : ctx: Option<&RequestContext>,
922 640333 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
923 48 : let (weak, permit) = {
924 : // get_or_init_detached can:
925 : // - be fast (mutex lock) OR uncontested semaphore permit acquire
926 : // - be slow (wait for semaphore permit or closing)
927 640333 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
928 :
929 640333 : let locked = self
930 640333 : .inner
931 640333 : .get_or_init_detached()
932 9 : .await
933 640333 : .map(|mut guard| guard.get_and_upgrade().ok_or(guard));
934 640333 :
935 640333 : scopeguard::ScopeGuard::into_inner(init_cancelled);
936 :
937 640285 : match locked {
938 : // this path could had been a RwLock::read
939 640285 : Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
940 0 : Ok(Ok((strong, _))) => {
941 0 : // when upgraded back, the Arc<DownloadedLayer> is still available, but
942 0 : // previously a `evict_and_wait` was received. this is the only place when we
943 0 : // send out an update without holding the InitPermit.
944 0 : //
945 0 : // note that we also have dropped the Guard; this is fine, because we just made
946 0 : // a state change and are holding a strong reference to be returned.
947 0 : self.status.as_ref().unwrap().send_replace(Status::Resident);
948 0 : LAYER_IMPL_METRICS
949 0 : .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
950 0 :
951 0 : return Ok(strong);
952 : }
953 24 : Ok(Err(guard)) => {
954 24 : // path to here: we won the eviction, the file should still be on the disk.
955 24 : let (weak, permit) = guard.take_and_deinit();
956 24 : (Some(weak), permit)
957 : }
958 24 : Err(permit) => (None, permit),
959 : }
960 : };
961 :
962 48 : if let Some(weak) = weak {
963 : // only drop the weak after dropping the heavier_once_cell guard
964 24 : assert!(
965 24 : matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
966 0 : "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
967 : );
968 24 : }
969 :
970 48 : let timeline = self
971 48 : .timeline
972 48 : .upgrade()
973 48 : .ok_or_else(|| DownloadError::TimelineShutdown)?;
974 :
975 : // count cancellations, which currently remain largely unexpected
976 48 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
977 :
978 : // check if we really need to be downloaded: this can happen if a read access won the
979 : // semaphore before eviction.
980 : //
981 : // if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
982 : // pending eviction will try to evict even upon finding an uninitialized `self.inner`.
983 48 : let needs_download = self
984 48 : .needs_download()
985 46 : .await
986 48 : .map_err(DownloadError::PreStatFailed);
987 48 :
988 48 : scopeguard::ScopeGuard::into_inner(init_cancelled);
989 :
990 48 : let needs_download = needs_download?;
991 :
992 48 : let Some(reason) = needs_download else {
993 : // the file is present locally because eviction has not had a chance to run yet
994 :
995 : #[cfg(test)]
996 24 : self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
997 6 : .await?;
998 :
999 18 : LAYER_IMPL_METRICS.inc_init_needed_no_download();
1000 18 :
1001 18 : return Ok(self.initialize_after_layer_is_on_disk(permit));
1002 : };
1003 :
1004 : // we must download; getting cancelled before spawning the download is not an issue as
1005 : // any still running eviction would not find anything to evict.
1006 :
1007 24 : if let NeedsDownload::NotFile(ft) = reason {
1008 0 : return Err(DownloadError::NotFile(ft));
1009 24 : }
1010 :
1011 24 : if let Some(ctx) = ctx {
1012 6 : self.check_expected_download(ctx)?;
1013 18 : }
1014 :
1015 24 : if !allow_download {
1016 : // this is only used from tests, but it is hard to test without the boolean
1017 6 : return Err(DownloadError::DownloadRequired);
1018 18 : }
1019 18 :
1020 18 : let download_ctx = ctx
1021 18 : .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
1022 18 : .unwrap_or(RequestContext::new(
1023 18 : TaskKind::LayerDownload,
1024 18 : DownloadBehavior::Download,
1025 18 : ));
1026 :
1027 18 : async move {
1028 18 : tracing::info!(%reason, "downloading on-demand");
1029 :
1030 18 : let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
1031 18 : let res = self
1032 18 : .download_init_and_wait(timeline, permit, download_ctx)
1033 30 : .await?;
1034 18 : scopeguard::ScopeGuard::into_inner(init_cancelled);
1035 18 : Ok(res)
1036 18 : }
1037 18 : .instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
1038 30 : .await
1039 640333 : }
1040 :
1041 : /// Nag or fail per RequestContext policy
1042 6 : fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
1043 6 : use crate::context::DownloadBehavior::*;
1044 6 : let b = ctx.download_behavior();
1045 6 : match b {
1046 6 : Download => Ok(()),
1047 : Warn | Error => {
1048 0 : tracing::info!(
1049 0 : "unexpectedly on-demand downloading for task kind {:?}",
1050 0 : ctx.task_kind()
1051 : );
1052 0 : crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
1053 :
1054 0 : let really_error =
1055 0 : matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
1056 :
1057 0 : if really_error {
1058 : // this check is only probablistic, seems like flakyness footgun
1059 0 : Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
1060 : } else {
1061 0 : Ok(())
1062 : }
1063 : }
1064 : }
1065 6 : }
1066 :
1067 : /// Actual download, at most one is executed at the time.
1068 18 : async fn download_init_and_wait(
1069 18 : self: &Arc<Self>,
1070 18 : timeline: Arc<Timeline>,
1071 18 : permit: heavier_once_cell::InitPermit,
1072 18 : ctx: RequestContext,
1073 18 : ) -> Result<Arc<DownloadedLayer>, DownloadError> {
1074 18 : debug_assert_current_span_has_tenant_and_timeline_id();
1075 18 :
1076 18 : let (tx, rx) = tokio::sync::oneshot::channel();
1077 18 :
1078 18 : let this: Arc<Self> = self.clone();
1079 :
1080 18 : let guard = timeline
1081 18 : .gate
1082 18 : .enter()
1083 18 : .map_err(|_| DownloadError::DownloadCancelled)?;
1084 :
1085 18 : Self::spawn(
1086 18 : async move {
1087 0 : let _guard = guard;
1088 0 :
1089 0 : // now that we have commited to downloading, send out an update to:
1090 0 : // - unhang any pending eviction
1091 0 : // - break out of evict_and_wait
1092 0 : this.status
1093 0 : .as_ref()
1094 0 : .unwrap()
1095 0 : .send_replace(Status::Downloading);
1096 18 :
1097 18 : #[cfg(test)]
1098 18 : this.failpoint(failpoints::FailpointKind::WaitBeforeDownloading)
1099 6 : .await
1100 18 : .unwrap();
1101 :
1102 265 : let res = this.download_and_init(timeline, permit, &ctx).await;
1103 :
1104 18 : if let Err(res) = tx.send(res) {
1105 0 : match res {
1106 0 : Ok(_res) => {
1107 0 : tracing::debug!("layer initialized, but caller has been cancelled");
1108 0 : LAYER_IMPL_METRICS.inc_init_completed_without_requester();
1109 : }
1110 0 : Err(e) => {
1111 0 : tracing::info!(
1112 0 : "layer file download failed, and caller has been cancelled: {e:?}"
1113 : );
1114 0 : LAYER_IMPL_METRICS.inc_download_failed_without_requester();
1115 : }
1116 : }
1117 18 : }
1118 18 : }
1119 18 : .in_current_span(),
1120 18 : );
1121 18 :
1122 30 : match rx.await {
1123 18 : Ok(Ok(res)) => Ok(res),
1124 : Ok(Err(remote_storage::DownloadError::Cancelled)) => {
1125 0 : Err(DownloadError::DownloadCancelled)
1126 : }
1127 0 : Ok(Err(_)) => Err(DownloadError::DownloadFailed),
1128 0 : Err(_gone) => Err(DownloadError::DownloadCancelled),
1129 : }
1130 18 : }
1131 :
1132 18 : async fn download_and_init(
1133 18 : self: &Arc<LayerInner>,
1134 18 : timeline: Arc<Timeline>,
1135 18 : permit: heavier_once_cell::InitPermit,
1136 18 : ctx: &RequestContext,
1137 18 : ) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
1138 18 : let result = timeline
1139 18 : .remote_client
1140 18 : .download_layer_file(
1141 18 : &self.desc.layer_name(),
1142 18 : &self.metadata(),
1143 18 : &self.path,
1144 18 : &timeline.cancel,
1145 18 : ctx,
1146 18 : )
1147 247 : .await;
1148 :
1149 18 : match result {
1150 18 : Ok(size) => {
1151 18 : assert_eq!(size, self.desc.file_size);
1152 :
1153 18 : match self.needs_download().await {
1154 0 : Ok(Some(reason)) => {
1155 0 : // this is really a bug in needs_download or remote timeline client
1156 0 : panic!("post-condition failed: needs_download returned {reason:?}");
1157 : }
1158 18 : Ok(None) => {
1159 18 : // as expected
1160 18 : }
1161 0 : Err(e) => {
1162 0 : panic!("post-condition failed: needs_download errored: {e:?}");
1163 : }
1164 : }
1165 :
1166 18 : tracing::info!(size=%self.desc.file_size, "on-demand download successful");
1167 18 : timeline
1168 18 : .metrics
1169 18 : .resident_physical_size_add(self.desc.file_size);
1170 18 : self.consecutive_failures.store(0, Ordering::Relaxed);
1171 18 :
1172 18 : let since_last_eviction = self
1173 18 : .last_evicted_at
1174 18 : .lock()
1175 18 : .unwrap()
1176 18 : .take()
1177 18 : .map(|ts| ts.elapsed());
1178 18 : if let Some(since_last_eviction) = since_last_eviction {
1179 18 : LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
1180 18 : }
1181 :
1182 18 : self.access_stats.record_residence_event();
1183 18 :
1184 18 : Ok(self.initialize_after_layer_is_on_disk(permit))
1185 : }
1186 0 : Err(e) => {
1187 0 : let consecutive_failures =
1188 0 : 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
1189 0 :
1190 0 : if timeline.cancel.is_cancelled() {
1191 : // If we're shutting down, drop out before logging the error
1192 0 : return Err(e);
1193 0 : }
1194 0 :
1195 0 : tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
1196 :
1197 0 : let backoff = utils::backoff::exponential_backoff_duration_seconds(
1198 0 : consecutive_failures.min(u32::MAX as usize) as u32,
1199 0 : 1.5,
1200 0 : 60.0,
1201 0 : );
1202 0 :
1203 0 : let backoff = std::time::Duration::from_secs_f64(backoff);
1204 :
1205 : tokio::select! {
1206 : _ = tokio::time::sleep(backoff) => {},
1207 : _ = timeline.cancel.cancelled() => {},
1208 : };
1209 :
1210 0 : Err(e)
1211 : }
1212 : }
1213 18 : }
1214 :
1215 : /// Initializes the `Self::inner` to a "resident" state.
1216 : ///
1217 : /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
1218 : /// before calling this method.
1219 : ///
1220 : /// If this method is ever made async, it needs to be cancellation safe so that no state
1221 : /// changes are made before we can write to the OnceCell in non-cancellable fashion.
1222 36 : fn initialize_after_layer_is_on_disk(
1223 36 : self: &Arc<LayerInner>,
1224 36 : permit: heavier_once_cell::InitPermit,
1225 36 : ) -> Arc<DownloadedLayer> {
1226 36 : debug_assert_current_span_has_tenant_and_timeline_id();
1227 36 :
1228 36 : // disable any scheduled but not yet running eviction deletions for this initialization
1229 36 : let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
1230 36 : self.status.as_ref().unwrap().send_replace(Status::Resident);
1231 36 :
1232 36 : let res = Arc::new(DownloadedLayer {
1233 36 : owner: Arc::downgrade(self),
1234 36 : kind: tokio::sync::OnceCell::default(),
1235 36 : version: next_version,
1236 36 : });
1237 36 :
1238 36 : let waiters = self.inner.initializer_count();
1239 36 : if waiters > 0 {
1240 0 : tracing::info!(waiters, "completing layer init for other tasks");
1241 36 : }
1242 :
1243 36 : let value = ResidentOrWantedEvicted::Resident(res.clone());
1244 36 :
1245 36 : self.inner.set(value, permit);
1246 36 :
1247 36 : res
1248 36 : }
1249 :
1250 72 : async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1251 72 : match tokio::fs::metadata(&self.path).await {
1252 48 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1253 24 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1254 0 : Err(e) => Err(e),
1255 : }
1256 72 : }
1257 :
1258 72 : fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
1259 72 : match self.path.metadata() {
1260 72 : Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
1261 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
1262 0 : Err(e) => Err(e),
1263 : }
1264 72 : }
1265 :
1266 120 : fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
1267 120 : // in future, this should include sha2-256 validation of the file.
1268 120 : if !m.is_file() {
1269 0 : Err(NeedsDownload::NotFile(m.file_type()))
1270 120 : } else if m.len() != self.desc.file_size {
1271 0 : Err(NeedsDownload::WrongSize {
1272 0 : actual: m.len(),
1273 0 : expected: self.desc.file_size,
1274 0 : })
1275 : } else {
1276 120 : Ok(())
1277 : }
1278 120 : }
1279 :
1280 0 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
1281 0 : let layer_name = self.desc.layer_name().to_string();
1282 0 :
1283 0 : let resident = self
1284 0 : .inner
1285 0 : .get()
1286 0 : .map(|rowe| rowe.is_likely_resident())
1287 0 : .unwrap_or(false);
1288 0 :
1289 0 : let access_stats = self.access_stats.as_api_model(reset);
1290 0 :
1291 0 : if self.desc.is_delta {
1292 0 : let lsn_range = &self.desc.lsn_range;
1293 0 :
1294 0 : HistoricLayerInfo::Delta {
1295 0 : layer_file_name: layer_name,
1296 0 : layer_file_size: self.desc.file_size,
1297 0 : lsn_start: lsn_range.start,
1298 0 : lsn_end: lsn_range.end,
1299 0 : remote: !resident,
1300 0 : access_stats,
1301 0 : l0: crate::tenant::layer_map::LayerMap::is_l0(
1302 0 : &self.layer_desc().key_range,
1303 0 : self.layer_desc().is_delta,
1304 0 : ),
1305 0 : }
1306 : } else {
1307 0 : let lsn = self.desc.image_layer_lsn();
1308 0 :
1309 0 : HistoricLayerInfo::Image {
1310 0 : layer_file_name: layer_name,
1311 0 : layer_file_size: self.desc.file_size,
1312 0 : lsn_start: lsn,
1313 0 : remote: !resident,
1314 0 : access_stats,
1315 0 : }
1316 : }
1317 0 : }
1318 :
1319 : /// `DownloadedLayer` is being dropped, so it calls this method.
1320 72 : fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
1321 : // we cannot know without inspecting LayerInner::inner if we should evict or not, even
1322 : // though here it is very likely
1323 72 : let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
1324 :
1325 : // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
1326 : // drop while the `self.inner` is being locked, leading to a deadlock.
1327 :
1328 72 : let start_evicting = async move {
1329 72 : #[cfg(test)]
1330 72 : self.failpoint(failpoints::FailpointKind::WaitBeforeStartingEvicting)
1331 42 : .await
1332 72 : .expect("failpoint should not have errored");
1333 72 :
1334 72 : tracing::debug!("eviction started");
1335 :
1336 72 : let res = self.wait_for_turn_and_evict(only_version).await;
1337 : // metrics: ignore the Ok branch, it is not done yet
1338 72 : if let Err(e) = res {
1339 18 : tracing::debug!(res=?Err::<(), _>(&e), "eviction completed");
1340 18 : LAYER_IMPL_METRICS.inc_eviction_cancelled(e);
1341 54 : }
1342 72 : };
1343 :
1344 72 : Self::spawn(start_evicting.instrument(span));
1345 72 : }
1346 :
1347 72 : async fn wait_for_turn_and_evict(
1348 72 : self: Arc<LayerInner>,
1349 72 : only_version: usize,
1350 72 : ) -> Result<(), EvictionCancelled> {
1351 138 : fn is_good_to_continue(status: &Status) -> Result<(), EvictionCancelled> {
1352 138 : use Status::*;
1353 138 : match status {
1354 132 : Resident => Ok(()),
1355 6 : Evicted => Err(EvictionCancelled::UnexpectedEvictedState),
1356 0 : Downloading => Err(EvictionCancelled::LostToDownload),
1357 : }
1358 138 : }
1359 :
1360 72 : let timeline = self
1361 72 : .timeline
1362 72 : .upgrade()
1363 72 : .ok_or(EvictionCancelled::TimelineGone)?;
1364 :
1365 72 : let mut rx = self
1366 72 : .status
1367 72 : .as_ref()
1368 72 : .expect("LayerInner cannot be dropped, holding strong ref")
1369 72 : .subscribe();
1370 72 :
1371 72 : is_good_to_continue(&rx.borrow_and_update())?;
1372 :
1373 66 : let Ok(gate) = timeline.gate.enter() else {
1374 0 : return Err(EvictionCancelled::TimelineGone);
1375 : };
1376 :
1377 54 : let permit = {
1378 : // we cannot just `std::fs::remove_file` because there might already be an
1379 : // get_or_maybe_download which will inspect filesystem and reinitialize. filesystem
1380 : // operations must be done while holding the heavier_once_cell::InitPermit
1381 66 : let mut wait = std::pin::pin!(self.inner.get_or_init_detached());
1382 :
1383 66 : let waited = loop {
1384 66 : // we must race to the Downloading starting, otherwise we would have to wait until the
1385 66 : // completion of the download. waiting for download could be long and hinder our
1386 66 : // efforts to alert on "hanging" evictions.
1387 66 : tokio::select! {
1388 : res = &mut wait => break res,
1389 : _ = rx.changed() => {
1390 : is_good_to_continue(&rx.borrow_and_update())?;
1391 : // two possibilities for Status::Resident:
1392 : // - the layer was found locally from disk by a read
1393 : // - we missed a bunch of updates and now the layer is
1394 : // again downloaded -- assume we'll fail later on with
1395 : // version check or AlreadyReinitialized
1396 : }
1397 66 : }
1398 66 : };
1399 :
1400 : // re-check now that we have the guard or permit; all updates should have happened
1401 : // while holding the permit.
1402 66 : is_good_to_continue(&rx.borrow_and_update())?;
1403 :
1404 : // the term deinitialize is used here, because we clearing out the Weak will eventually
1405 : // lead to deallocating the reference counted value, and the value we
1406 : // `Guard::take_and_deinit` is likely to be the last because the Weak is never cloned.
1407 66 : let (_weak, permit) = match waited {
1408 60 : Ok(guard) => {
1409 60 : match &*guard {
1410 54 : ResidentOrWantedEvicted::WantedEvicted(_weak, version)
1411 54 : if *version == only_version =>
1412 48 : {
1413 48 : tracing::debug!(version, "deinitializing matching WantedEvicted");
1414 48 : let (weak, permit) = guard.take_and_deinit();
1415 48 : (Some(weak), permit)
1416 : }
1417 6 : ResidentOrWantedEvicted::WantedEvicted(_, version) => {
1418 6 : // if we were not doing the version check, we would need to try to
1419 6 : // upgrade the weak here to see if it really is dropped. version check
1420 6 : // is done instead assuming that it is cheaper.
1421 6 : tracing::debug!(
1422 : version,
1423 : only_version,
1424 0 : "version mismatch, not deinitializing"
1425 : );
1426 6 : return Err(EvictionCancelled::VersionCheckFailed);
1427 : }
1428 : ResidentOrWantedEvicted::Resident(_) => {
1429 6 : return Err(EvictionCancelled::AlreadyReinitialized);
1430 : }
1431 : }
1432 : }
1433 6 : Err(permit) => {
1434 6 : tracing::debug!("continuing after cancelled get_or_maybe_download or eviction");
1435 6 : (None, permit)
1436 : }
1437 : };
1438 :
1439 54 : permit
1440 54 : };
1441 54 :
1442 54 : let span = tracing::Span::current();
1443 54 :
1444 54 : let spawned_at = std::time::Instant::now();
1445 54 :
1446 54 : // this is on purpose a detached spawn; we don't need to wait for it
1447 54 : //
1448 54 : // eviction completion reporting is the only thing hinging on this, and it can be just as
1449 54 : // well from a spawn_blocking thread.
1450 54 : //
1451 54 : // important to note that now that we've acquired the permit we have made sure the evicted
1452 54 : // file is either the exact `WantedEvicted` we wanted to evict, or uninitialized in case
1453 54 : // there are multiple evictions. The rest is not cancellable, and we've now commited to
1454 54 : // evicting.
1455 54 : //
1456 54 : // If spawn_blocking has a queue and maximum number of threads are in use, we could stall
1457 54 : // reads. We will need to add cancellation for that if necessary.
1458 54 : Self::spawn_blocking(move || {
1459 54 : let _span = span.entered();
1460 54 :
1461 54 : let res = self.evict_blocking(&timeline, &gate, &permit);
1462 54 :
1463 54 : let waiters = self.inner.initializer_count();
1464 54 :
1465 54 : if waiters > 0 {
1466 0 : LAYER_IMPL_METRICS.inc_evicted_with_waiters();
1467 54 : }
1468 :
1469 54 : let completed_in = spawned_at.elapsed();
1470 54 : LAYER_IMPL_METRICS.record_time_to_evict(completed_in);
1471 54 :
1472 54 : match res {
1473 54 : Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
1474 0 : Err(e) => LAYER_IMPL_METRICS.inc_eviction_cancelled(e),
1475 : }
1476 :
1477 54 : tracing::debug!(?res, elapsed_ms=%completed_in.as_millis(), %waiters, "eviction completed");
1478 54 : });
1479 54 :
1480 54 : Ok(())
1481 72 : }
1482 :
1483 : /// This is blocking only to do just one spawn_blocking hop compared to multiple via tokio::fs.
1484 54 : fn evict_blocking(
1485 54 : &self,
1486 54 : timeline: &Timeline,
1487 54 : _gate: &gate::GateGuard,
1488 54 : _permit: &heavier_once_cell::InitPermit,
1489 54 : ) -> Result<(), EvictionCancelled> {
1490 54 : // now accesses to `self.inner.get_or_init*` wait on the semaphore or the `_permit`
1491 54 :
1492 54 : match capture_mtime_and_remove(&self.path) {
1493 54 : Ok(local_layer_mtime) => {
1494 54 : let duration = SystemTime::now().duration_since(local_layer_mtime);
1495 54 : match duration {
1496 54 : Ok(elapsed) => {
1497 54 : let accessed_and_visible = self.access_stats.accessed()
1498 12 : && self.access_stats.visibility() == LayerVisibilityHint::Visible;
1499 54 : if accessed_and_visible {
1500 12 : // Only layers used for reads contribute to our "low residence" metric that is used
1501 12 : // to detect thrashing. Layers promoted for other reasons (e.g. compaction) are allowed
1502 12 : // to be rapidly evicted without contributing to this metric.
1503 12 : timeline
1504 12 : .metrics
1505 12 : .evictions_with_low_residence_duration
1506 12 : .read()
1507 12 : .unwrap()
1508 12 : .observe(elapsed);
1509 42 : }
1510 :
1511 54 : tracing::info!(
1512 0 : residence_millis = elapsed.as_millis(),
1513 0 : accessed_and_visible,
1514 0 : "evicted layer after known residence period"
1515 : );
1516 : }
1517 : Err(_) => {
1518 0 : tracing::info!("evicted layer after unknown residence period");
1519 : }
1520 : }
1521 54 : timeline.metrics.evictions.inc();
1522 54 : timeline
1523 54 : .metrics
1524 54 : .resident_physical_size_sub(self.desc.file_size);
1525 : }
1526 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1527 0 : tracing::error!(
1528 : layer_size = %self.desc.file_size,
1529 0 : "failed to evict layer from disk, it was already gone"
1530 : );
1531 0 : return Err(EvictionCancelled::FileNotFound);
1532 : }
1533 0 : Err(e) => {
1534 0 : // FIXME: this should probably be an abort
1535 0 : tracing::error!("failed to evict file from disk: {e:#}");
1536 0 : return Err(EvictionCancelled::RemoveFailed);
1537 : }
1538 : }
1539 :
1540 54 : self.access_stats.record_residence_event();
1541 54 :
1542 54 : self.status.as_ref().unwrap().send_replace(Status::Evicted);
1543 54 :
1544 54 : *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
1545 54 :
1546 54 : Ok(())
1547 54 : }
1548 :
1549 7209 : fn metadata(&self) -> LayerFileMetadata {
1550 7209 : LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
1551 7209 : }
1552 :
1553 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1554 : ///
1555 : /// Synchronizing with spawned tasks is very complicated otherwise.
1556 90 : fn spawn<F>(fut: F)
1557 90 : where
1558 90 : F: std::future::Future<Output = ()> + Send + 'static,
1559 90 : {
1560 90 : #[cfg(test)]
1561 90 : tokio::task::spawn(fut);
1562 90 : #[cfg(not(test))]
1563 90 : crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
1564 90 : }
1565 :
1566 : /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
1567 1443 : fn spawn_blocking<F>(f: F)
1568 1443 : where
1569 1443 : F: FnOnce() + Send + 'static,
1570 1443 : {
1571 1443 : #[cfg(test)]
1572 1443 : tokio::task::spawn_blocking(f);
1573 1443 : #[cfg(not(test))]
1574 1443 : crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
1575 1443 : }
1576 : }
1577 :
1578 54 : fn capture_mtime_and_remove(path: &Utf8Path) -> Result<SystemTime, std::io::Error> {
1579 54 : let m = path.metadata()?;
1580 54 : let local_layer_mtime = m.modified()?;
1581 54 : std::fs::remove_file(path)?;
1582 54 : Ok(local_layer_mtime)
1583 54 : }
1584 :
1585 0 : #[derive(Debug, thiserror::Error)]
1586 : pub(crate) enum EvictionError {
1587 : #[error("layer was already evicted")]
1588 : NotFound,
1589 :
1590 : /// Evictions must always lose to downloads in races, and this time it happened.
1591 : #[error("layer was downloaded instead")]
1592 : Downloaded,
1593 :
1594 : #[error("eviction did not happen within timeout")]
1595 : Timeout,
1596 : }
1597 :
1598 : /// Error internal to the [`LayerInner::get_or_maybe_download`]
1599 0 : #[derive(Debug, thiserror::Error)]
1600 : pub(crate) enum DownloadError {
1601 : #[error("timeline has already shutdown")]
1602 : TimelineShutdown,
1603 : #[error("context denies downloading")]
1604 : ContextAndConfigReallyDeniesDownloads,
1605 : #[error("downloading is really required but not allowed by this method")]
1606 : DownloadRequired,
1607 : #[error("layer path exists, but it is not a file: {0:?}")]
1608 : NotFile(std::fs::FileType),
1609 : /// Why no error here? Because it will be reported by page_service. We should had also done
1610 : /// retries already.
1611 : #[error("downloading evicted layer file failed")]
1612 : DownloadFailed,
1613 : #[error("downloading failed, possibly for shutdown")]
1614 : DownloadCancelled,
1615 : #[error("pre-condition: stat before download failed")]
1616 : PreStatFailed(#[source] std::io::Error),
1617 :
1618 : #[cfg(test)]
1619 : #[error("failpoint: {0:?}")]
1620 : Failpoint(failpoints::FailpointKind),
1621 : }
1622 :
1623 : impl DownloadError {
1624 0 : pub(crate) fn is_cancelled(&self) -> bool {
1625 0 : matches!(self, DownloadError::DownloadCancelled)
1626 0 : }
1627 : }
1628 :
1629 : #[derive(Debug, PartialEq)]
1630 : pub(crate) enum NeedsDownload {
1631 : NotFound,
1632 : NotFile(std::fs::FileType),
1633 : WrongSize { actual: u64, expected: u64 },
1634 : }
1635 :
1636 : impl std::fmt::Display for NeedsDownload {
1637 18 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1638 18 : match self {
1639 18 : NeedsDownload::NotFound => write!(f, "file was not found"),
1640 0 : NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
1641 0 : NeedsDownload::WrongSize { actual, expected } => {
1642 0 : write!(f, "file size mismatch {actual} vs. {expected}")
1643 : }
1644 : }
1645 18 : }
1646 : }
1647 :
1648 : /// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
1649 : pub(crate) struct DownloadedLayer {
1650 : owner: Weak<LayerInner>,
1651 : // Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
1652 : // DownloadedLayer
1653 : kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
1654 : version: usize,
1655 : }
1656 :
1657 : impl std::fmt::Debug for DownloadedLayer {
1658 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1659 0 : f.debug_struct("DownloadedLayer")
1660 0 : // owner omitted because it is always "Weak"
1661 0 : .field("kind", &self.kind)
1662 0 : .field("version", &self.version)
1663 0 : .finish()
1664 0 : }
1665 : }
1666 :
1667 : impl Drop for DownloadedLayer {
1668 1791 : fn drop(&mut self) {
1669 1791 : if let Some(owner) = self.owner.upgrade() {
1670 72 : owner.on_downloaded_layer_drop(self.version);
1671 1719 : } else {
1672 1719 : // Layer::drop will handle cancelling the eviction; because of drop order and
1673 1719 : // `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
1674 1719 : }
1675 1791 : }
1676 : }
1677 :
1678 : impl DownloadedLayer {
1679 : /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
1680 : /// Failure to load the layer is sticky, i.e., future `get()` calls will return
1681 : /// the initial load failure immediately.
1682 : ///
1683 : /// `owner` parameter is a strong reference at the same `LayerInner` as the
1684 : /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
1685 : /// we will always have the LayerInner on the callstack, so we can just use it.
1686 641707 : async fn get<'a>(
1687 641707 : &'a self,
1688 641707 : owner: &Arc<LayerInner>,
1689 641707 : ctx: &RequestContext,
1690 641707 : ) -> anyhow::Result<&'a LayerKind> {
1691 641707 : let init = || async {
1692 3432 : assert_eq!(
1693 3432 : Weak::as_ptr(&self.owner),
1694 3432 : Arc::as_ptr(owner),
1695 3432 : "these are the same, just avoiding the upgrade"
1696 3432 : );
1697 3432 :
1698 3432 : let res = if owner.desc.is_delta {
1699 3432 : let summary = Some(delta_layer::Summary::expected(
1700 3126 : owner.desc.tenant_shard_id.tenant_id,
1701 3126 : owner.desc.timeline_id,
1702 3126 : owner.desc.key_range.clone(),
1703 3126 : owner.desc.lsn_range.clone(),
1704 3126 : ));
1705 3126 : delta_layer::DeltaLayerInner::load(
1706 3126 : &owner.path,
1707 3126 : summary,
1708 3126 : Some(owner.conf.max_vectored_read_bytes),
1709 3126 : ctx,
1710 3126 : )
1711 3432 : .await
1712 3432 : .map(LayerKind::Delta)
1713 3432 : } else {
1714 3432 : let lsn = owner.desc.image_layer_lsn();
1715 306 : let summary = Some(image_layer::Summary::expected(
1716 306 : owner.desc.tenant_shard_id.tenant_id,
1717 306 : owner.desc.timeline_id,
1718 306 : owner.desc.key_range.clone(),
1719 306 : lsn,
1720 306 : ));
1721 306 : image_layer::ImageLayerInner::load(
1722 306 : &owner.path,
1723 306 : lsn,
1724 306 : summary,
1725 306 : Some(owner.conf.max_vectored_read_bytes),
1726 306 : ctx,
1727 306 : )
1728 3432 : .await
1729 3432 : .map(LayerKind::Image)
1730 3432 : };
1731 3432 :
1732 3432 : match res {
1733 3432 : Ok(layer) => Ok(layer),
1734 3432 : Err(err) => {
1735 0 : LAYER_IMPL_METRICS.inc_permanent_loading_failures();
1736 0 : // We log this message once over the lifetime of `Self`
1737 0 : // => Ok and good to log backtrace and path here.
1738 0 : tracing::error!(
1739 3432 : "layer load failed, assuming permanent failure: {}: {err:?}",
1740 0 : owner.path
1741 3432 : );
1742 3432 : Err(err)
1743 3432 : }
1744 3432 : }
1745 3432 : };
1746 641707 : self.kind
1747 641707 : .get_or_init(init)
1748 3452 : .await
1749 641707 : .as_ref()
1750 641707 : // We already logged the full backtrace above, once. Don't repeat that here.
1751 641707 : .map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
1752 641707 : }
1753 :
1754 638797 : async fn get_values_reconstruct_data(
1755 638797 : &self,
1756 638797 : keyspace: KeySpace,
1757 638797 : lsn_range: Range<Lsn>,
1758 638797 : reconstruct_data: &mut ValuesReconstructState,
1759 638797 : owner: &Arc<LayerInner>,
1760 638797 : ctx: &RequestContext,
1761 638797 : ) -> Result<(), GetVectoredError> {
1762 638797 : use LayerKind::*;
1763 638797 :
1764 638797 : match self
1765 638797 : .get(owner, ctx)
1766 2576 : .await
1767 638797 : .map_err(GetVectoredError::Other)?
1768 : {
1769 614615 : Delta(d) => {
1770 614615 : d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
1771 267192 : .await
1772 : }
1773 24182 : Image(i) => {
1774 24182 : i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
1775 16331 : .await
1776 : }
1777 : }
1778 638797 : }
1779 :
1780 0 : async fn load_key_values(
1781 0 : &self,
1782 0 : owner: &Arc<LayerInner>,
1783 0 : ctx: &RequestContext,
1784 0 : ) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
1785 0 : use LayerKind::*;
1786 0 :
1787 0 : match self.get(owner, ctx).await? {
1788 0 : Delta(d) => d.load_key_values(ctx).await,
1789 0 : Image(i) => i.load_key_values(ctx).await,
1790 : }
1791 0 : }
1792 :
1793 12 : async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
1794 12 : use LayerKind::*;
1795 12 : match self.get(owner, ctx).await? {
1796 12 : Delta(d) => d.dump(ctx).await?,
1797 0 : Image(i) => i.dump(ctx).await?,
1798 : }
1799 :
1800 12 : Ok(())
1801 12 : }
1802 : }
1803 :
1804 : /// Wrapper around an actual layer implementation.
1805 : #[derive(Debug)]
1806 : enum LayerKind {
1807 : Delta(delta_layer::DeltaLayerInner),
1808 : Image(image_layer::ImageLayerInner),
1809 : }
1810 :
1811 : /// Guard for forcing a layer be resident while it exists.
1812 : #[derive(Clone)]
1813 : pub(crate) struct ResidentLayer {
1814 : owner: Layer,
1815 : downloaded: Arc<DownloadedLayer>,
1816 : }
1817 :
1818 : impl std::fmt::Display for ResidentLayer {
1819 5754 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1820 5754 : write!(f, "{}", self.owner)
1821 5754 : }
1822 : }
1823 :
1824 : impl std::fmt::Debug for ResidentLayer {
1825 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1826 0 : write!(f, "{}", self.owner)
1827 0 : }
1828 : }
1829 :
1830 : impl ResidentLayer {
1831 : /// Release the eviction guard, converting back into a plain [`Layer`].
1832 : ///
1833 : /// You can access the [`Layer`] also by using `as_ref`.
1834 1260 : pub(crate) fn drop_eviction_guard(self) -> Layer {
1835 1260 : self.into()
1836 1260 : }
1837 :
1838 : /// Loads all keys stored in the layer. Returns key, lsn and value size.
1839 2412 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1840 : pub(crate) async fn load_keys<'a>(
1841 : &'a self,
1842 : ctx: &RequestContext,
1843 : ) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
1844 : use LayerKind::*;
1845 :
1846 : let owner = &self.owner.0;
1847 : match self.downloaded.get(owner, ctx).await? {
1848 : Delta(ref d) => {
1849 : // this is valid because the DownloadedLayer::kind is a OnceCell, not a
1850 : // Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
1851 : // while it's being held.
1852 : self.owner.record_access(ctx);
1853 :
1854 : delta_layer::DeltaLayerInner::load_keys(d, ctx)
1855 : .await
1856 0 : .with_context(|| format!("Layer index is corrupted for {self}"))
1857 : }
1858 : Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
1859 : }
1860 : }
1861 :
1862 : /// Read all they keys in this layer which match the ShardIdentity, and write them all to
1863 : /// the provided writer. Return the number of keys written.
1864 48 : #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
1865 : pub(crate) async fn filter(
1866 : &self,
1867 : shard_identity: &ShardIdentity,
1868 : writer: &mut ImageLayerWriter,
1869 : ctx: &RequestContext,
1870 : ) -> Result<usize, CompactionError> {
1871 : use LayerKind::*;
1872 :
1873 : match self
1874 : .downloaded
1875 : .get(&self.owner.0, ctx)
1876 : .await
1877 : .map_err(CompactionError::Other)?
1878 : {
1879 : Delta(_) => {
1880 : return Err(CompactionError::Other(anyhow::anyhow!(format!(
1881 : "cannot filter() on a delta layer {self}"
1882 : ))));
1883 : }
1884 : Image(i) => i
1885 : .filter(shard_identity, writer, ctx)
1886 : .await
1887 : .map_err(CompactionError::Other),
1888 : }
1889 : }
1890 :
1891 : /// Returns the amount of keys and values written to the writer.
1892 30 : pub(crate) async fn copy_delta_prefix(
1893 30 : &self,
1894 30 : writer: &mut super::delta_layer::DeltaLayerWriter,
1895 30 : until: Lsn,
1896 30 : ctx: &RequestContext,
1897 30 : ) -> anyhow::Result<usize> {
1898 30 : use LayerKind::*;
1899 30 :
1900 30 : let owner = &self.owner.0;
1901 30 :
1902 30 : match self.downloaded.get(owner, ctx).await? {
1903 30 : Delta(ref d) => d
1904 30 : .copy_prefix(writer, until, ctx)
1905 39 : .await
1906 30 : .with_context(|| format!("copy_delta_prefix until {until} of {self}")),
1907 0 : Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
1908 : }
1909 30 : }
1910 :
1911 4997 : pub(crate) fn local_path(&self) -> &Utf8Path {
1912 4997 : &self.owner.0.path
1913 4997 : }
1914 :
1915 4608 : pub(crate) fn metadata(&self) -> LayerFileMetadata {
1916 4608 : self.owner.metadata()
1917 4608 : }
1918 :
1919 : /// Cast the layer to a delta, return an error if it is an image layer.
1920 1524 : pub(crate) async fn get_as_delta(
1921 1524 : &self,
1922 1524 : ctx: &RequestContext,
1923 1524 : ) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
1924 1524 : use LayerKind::*;
1925 1524 : match self.downloaded.get(&self.owner.0, ctx).await? {
1926 1524 : Delta(ref d) => Ok(d),
1927 0 : Image(_) => Err(anyhow::anyhow!("image layer")),
1928 : }
1929 1524 : }
1930 :
1931 : /// Cast the layer to an image, return an error if it is a delta layer.
1932 114 : pub(crate) async fn get_as_image(
1933 114 : &self,
1934 114 : ctx: &RequestContext,
1935 114 : ) -> anyhow::Result<&image_layer::ImageLayerInner> {
1936 114 : use LayerKind::*;
1937 114 : match self.downloaded.get(&self.owner.0, ctx).await? {
1938 114 : Image(ref d) => Ok(d),
1939 0 : Delta(_) => Err(anyhow::anyhow!("delta layer")),
1940 : }
1941 114 : }
1942 : }
1943 :
1944 : impl AsLayerDesc for ResidentLayer {
1945 17723 : fn layer_desc(&self) -> &PersistentLayerDesc {
1946 17723 : self.owner.layer_desc()
1947 17723 : }
1948 : }
1949 :
1950 : impl AsRef<Layer> for ResidentLayer {
1951 5760 : fn as_ref(&self) -> &Layer {
1952 5760 : &self.owner
1953 5760 : }
1954 : }
1955 :
1956 : /// Drop the eviction guard.
1957 : impl From<ResidentLayer> for Layer {
1958 1260 : fn from(value: ResidentLayer) -> Self {
1959 1260 : value.owner
1960 1260 : }
1961 : }
1962 :
1963 : use metrics::IntCounter;
1964 :
1965 : pub(crate) struct LayerImplMetrics {
1966 : started_evictions: IntCounter,
1967 : completed_evictions: IntCounter,
1968 : cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
1969 :
1970 : started_deletes: IntCounter,
1971 : completed_deletes: IntCounter,
1972 : failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
1973 :
1974 : rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
1975 : inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
1976 : redownload_after: metrics::Histogram,
1977 : time_to_evict: metrics::Histogram,
1978 : }
1979 :
1980 : impl Default for LayerImplMetrics {
1981 126 : fn default() -> Self {
1982 126 : use enum_map::Enum;
1983 126 :
1984 126 : // reminder: these will be pageserver_layer_* with "_total" suffix
1985 126 :
1986 126 : let started_evictions = metrics::register_int_counter!(
1987 : "pageserver_layer_started_evictions",
1988 : "Evictions started in the Layer implementation"
1989 : )
1990 126 : .unwrap();
1991 126 : let completed_evictions = metrics::register_int_counter!(
1992 : "pageserver_layer_completed_evictions",
1993 : "Evictions completed in the Layer implementation"
1994 : )
1995 126 : .unwrap();
1996 126 :
1997 126 : let cancelled_evictions = metrics::register_int_counter_vec!(
1998 : "pageserver_layer_cancelled_evictions_count",
1999 : "Different reasons for evictions to have been cancelled or failed",
2000 : &["reason"]
2001 : )
2002 126 : .unwrap();
2003 126 :
2004 1134 : let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2005 1134 : let reason = EvictionCancelled::from_usize(i);
2006 1134 : let s = reason.as_str();
2007 1134 : cancelled_evictions.with_label_values(&[s])
2008 1134 : }));
2009 126 :
2010 126 : let started_deletes = metrics::register_int_counter!(
2011 : "pageserver_layer_started_deletes",
2012 : "Deletions on drop pending in the Layer implementation"
2013 : )
2014 126 : .unwrap();
2015 126 : let completed_deletes = metrics::register_int_counter!(
2016 : "pageserver_layer_completed_deletes",
2017 : "Deletions on drop completed in the Layer implementation"
2018 : )
2019 126 : .unwrap();
2020 126 :
2021 126 : let failed_deletes = metrics::register_int_counter_vec!(
2022 : "pageserver_layer_failed_deletes_count",
2023 : "Different reasons for deletions on drop to have failed",
2024 : &["reason"]
2025 : )
2026 126 : .unwrap();
2027 126 :
2028 252 : let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2029 252 : let reason = DeleteFailed::from_usize(i);
2030 252 : let s = reason.as_str();
2031 252 : failed_deletes.with_label_values(&[s])
2032 252 : }));
2033 126 :
2034 126 : let rare_counters = metrics::register_int_counter_vec!(
2035 : "pageserver_layer_assumed_rare_count",
2036 : "Times unexpected or assumed rare event happened",
2037 : &["event"]
2038 : )
2039 126 : .unwrap();
2040 126 :
2041 882 : let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
2042 882 : let event = RareEvent::from_usize(i);
2043 882 : let s = event.as_str();
2044 882 : rare_counters.with_label_values(&[s])
2045 882 : }));
2046 126 :
2047 126 : let inits_cancelled = metrics::register_int_counter!(
2048 : "pageserver_layer_inits_cancelled_count",
2049 : "Times Layer initialization was cancelled",
2050 : )
2051 126 : .unwrap();
2052 126 :
2053 126 : let redownload_after = {
2054 126 : let minute = 60.0;
2055 126 : let hour = 60.0 * minute;
2056 : metrics::register_histogram!(
2057 : "pageserver_layer_redownloaded_after",
2058 : "Time between evicting and re-downloading.",
2059 : vec![
2060 : 10.0,
2061 : 30.0,
2062 : minute,
2063 : 5.0 * minute,
2064 : 15.0 * minute,
2065 : 30.0 * minute,
2066 : hour,
2067 : 12.0 * hour,
2068 : ]
2069 : )
2070 126 : .unwrap()
2071 126 : };
2072 126 :
2073 126 : let time_to_evict = metrics::register_histogram!(
2074 : "pageserver_layer_eviction_held_permit_seconds",
2075 : "Time eviction held the permit.",
2076 : vec![0.001, 0.010, 0.100, 0.500, 1.000, 5.000]
2077 : )
2078 126 : .unwrap();
2079 126 :
2080 126 : Self {
2081 126 : started_evictions,
2082 126 : completed_evictions,
2083 126 : cancelled_evictions,
2084 126 :
2085 126 : started_deletes,
2086 126 : completed_deletes,
2087 126 : failed_deletes,
2088 126 :
2089 126 : rare_counters,
2090 126 : inits_cancelled,
2091 126 : redownload_after,
2092 126 : time_to_evict,
2093 126 : }
2094 126 : }
2095 : }
2096 :
2097 : impl LayerImplMetrics {
2098 78 : fn inc_started_evictions(&self) {
2099 78 : self.started_evictions.inc();
2100 78 : }
2101 54 : fn inc_completed_evictions(&self) {
2102 54 : self.completed_evictions.inc();
2103 54 : }
2104 24 : fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
2105 24 : self.cancelled_evictions[reason].inc()
2106 24 : }
2107 :
2108 1398 : fn inc_started_deletes(&self) {
2109 1398 : self.started_deletes.inc();
2110 1398 : }
2111 1387 : fn inc_completed_deletes(&self) {
2112 1387 : self.completed_deletes.inc();
2113 1387 : }
2114 0 : fn inc_deletes_failed(&self, reason: DeleteFailed) {
2115 0 : self.failed_deletes[reason].inc();
2116 0 : }
2117 :
2118 : /// Counted separatedly from failed layer deletes because we will complete the layer deletion
2119 : /// attempt regardless of failure to delete local file.
2120 0 : fn inc_delete_removes_failed(&self) {
2121 0 : self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
2122 0 : }
2123 :
2124 : /// Expected rare just as cancellations are rare, but we could have cancellations separate from
2125 : /// the single caller which can start the download, so use this counter to separte them.
2126 0 : fn inc_init_completed_without_requester(&self) {
2127 0 : self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
2128 0 : }
2129 :
2130 : /// Expected rare because cancellations are unexpected, and failures are unexpected
2131 0 : fn inc_download_failed_without_requester(&self) {
2132 0 : self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
2133 0 : }
2134 :
2135 : /// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
2136 : ///
2137 : /// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
2138 : /// Option.
2139 0 : fn inc_raced_wanted_evicted_accesses(&self) {
2140 0 : self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
2141 0 : }
2142 :
2143 : /// These are only expected for [`Self::inc_init_cancelled`] amount when
2144 : /// running with remote storage.
2145 18 : fn inc_init_needed_no_download(&self) {
2146 18 : self.rare_counters[RareEvent::InitWithoutDownload].inc();
2147 18 : }
2148 :
2149 : /// Expected rare because all layer files should be readable and good
2150 0 : fn inc_permanent_loading_failures(&self) {
2151 0 : self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
2152 0 : }
2153 :
2154 0 : fn inc_init_cancelled(&self) {
2155 0 : self.inits_cancelled.inc()
2156 0 : }
2157 :
2158 18 : fn record_redownloaded_after(&self, duration: std::time::Duration) {
2159 18 : self.redownload_after.observe(duration.as_secs_f64())
2160 18 : }
2161 :
2162 : /// This would be bad if it ever happened, or mean extreme disk pressure. We should probably
2163 : /// instead cancel eviction if we would have read waiters. We cannot however separate reads
2164 : /// from other evictions, so this could have noise as well.
2165 0 : fn inc_evicted_with_waiters(&self) {
2166 0 : self.rare_counters[RareEvent::EvictedWithWaiters].inc();
2167 0 : }
2168 :
2169 : /// Recorded at least initially as the permit is now acquired in async context before
2170 : /// spawn_blocking action.
2171 54 : fn record_time_to_evict(&self, duration: std::time::Duration) {
2172 54 : self.time_to_evict.observe(duration.as_secs_f64())
2173 54 : }
2174 : }
2175 :
2176 : #[derive(Debug, Clone, Copy, enum_map::Enum)]
2177 : enum EvictionCancelled {
2178 : LayerGone,
2179 : TimelineGone,
2180 : VersionCheckFailed,
2181 : FileNotFound,
2182 : RemoveFailed,
2183 : AlreadyReinitialized,
2184 : /// Not evicted because of a pending reinitialization
2185 : LostToDownload,
2186 : /// After eviction, there was a new layer access which cancelled the eviction.
2187 : UpgradedBackOnAccess,
2188 : UnexpectedEvictedState,
2189 : }
2190 :
2191 : impl EvictionCancelled {
2192 1134 : fn as_str(&self) -> &'static str {
2193 1134 : match self {
2194 126 : EvictionCancelled::LayerGone => "layer_gone",
2195 126 : EvictionCancelled::TimelineGone => "timeline_gone",
2196 126 : EvictionCancelled::VersionCheckFailed => "version_check_fail",
2197 126 : EvictionCancelled::FileNotFound => "file_not_found",
2198 126 : EvictionCancelled::RemoveFailed => "remove_failed",
2199 126 : EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
2200 126 : EvictionCancelled::LostToDownload => "lost_to_download",
2201 126 : EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
2202 126 : EvictionCancelled::UnexpectedEvictedState => "unexpected_evicted_state",
2203 : }
2204 1134 : }
2205 : }
2206 :
2207 : #[derive(enum_map::Enum)]
2208 : enum DeleteFailed {
2209 : TimelineGone,
2210 : DeleteSchedulingFailed,
2211 : }
2212 :
2213 : impl DeleteFailed {
2214 252 : fn as_str(&self) -> &'static str {
2215 252 : match self {
2216 126 : DeleteFailed::TimelineGone => "timeline_gone",
2217 126 : DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
2218 : }
2219 252 : }
2220 : }
2221 :
2222 : #[derive(enum_map::Enum)]
2223 : enum RareEvent {
2224 : RemoveOnDropFailed,
2225 : InitCompletedWithoutRequester,
2226 : DownloadFailedWithoutRequester,
2227 : UpgradedWantedEvicted,
2228 : InitWithoutDownload,
2229 : PermanentLoadingFailure,
2230 : EvictedWithWaiters,
2231 : }
2232 :
2233 : impl RareEvent {
2234 882 : fn as_str(&self) -> &'static str {
2235 882 : use RareEvent::*;
2236 882 :
2237 882 : match self {
2238 126 : RemoveOnDropFailed => "remove_on_drop_failed",
2239 126 : InitCompletedWithoutRequester => "init_completed_without",
2240 126 : DownloadFailedWithoutRequester => "download_failed_without",
2241 126 : UpgradedWantedEvicted => "raced_wanted_evicted",
2242 126 : InitWithoutDownload => "init_needed_no_download",
2243 126 : PermanentLoadingFailure => "permanent_loading_failure",
2244 126 : EvictedWithWaiters => "evicted_with_waiters",
2245 : }
2246 882 : }
2247 : }
2248 :
2249 : pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
2250 : once_cell::sync::Lazy::new(LayerImplMetrics::default);
|