LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 98683a8629f0f7f0031d02e04512998d589d76ea.info Lines: 82.1 % 392 322
Test Date: 2025-04-11 16:58:57 Functions: 81.4 % 43 35

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::sync::Arc;
       3              : 
       4              : use anyhow::{Context, bail, ensure};
       5              : use itertools::Itertools;
       6              : use pageserver_api::keyspace::KeySpace;
       7              : use pageserver_api::shard::TenantShardId;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::trace;
      10              : use utils::id::TimelineId;
      11              : use utils::lsn::{AtomicLsn, Lsn};
      12              : 
      13              : use super::{LayerFringe, ReadableLayer, TimelineWriterState};
      14              : use crate::config::PageServerConf;
      15              : use crate::context::RequestContext;
      16              : use crate::metrics::TimelineMetrics;
      17              : use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
      18              : use crate::tenant::storage_layer::{
      19              :     AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
      20              :     PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
      21              : };
      22              : 
      23              : /// Provides semantic APIs to manipulate the layer map.
      24              : pub(crate) enum LayerManager {
      25              :     /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
      26              :     /// the layers.
      27              :     Open(OpenLayerManager),
      28              :     /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
      29              :     /// read-only.
      30              :     Closed {
      31              :         layers: HashMap<PersistentLayerKey, Layer>,
      32              :     },
      33              : }
      34              : 
      35              : impl Default for LayerManager {
      36          924 :     fn default() -> Self {
      37          924 :         LayerManager::Open(OpenLayerManager::default())
      38          924 :     }
      39              : }
      40              : 
      41              : impl LayerManager {
      42      1694841 :     fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
      43      1694841 :         match weak {
      44       480457 :             ReadableLayerWeak::PersistentLayer(desc) => {
      45       480457 :                 ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
      46              :             }
      47      1214384 :             ReadableLayerWeak::InMemoryLayer(desc) => {
      48      1214384 :                 let inmem = self
      49      1214384 :                     .layer_map()
      50      1214384 :                     .expect("no concurrent shutdown")
      51      1214384 :                     .in_memory_layer(&desc);
      52      1214384 :                 ReadableLayer::InMemoryLayer(inmem)
      53              :             }
      54              :         }
      55      1694841 :     }
      56              : 
      57       482377 :     pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
      58       482377 :         // The assumption for the `expect()` is that all code maintains the following invariant:
      59       482377 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
      60       482377 :         self.try_get_from_key(key)
      61       482377 :             .with_context(|| format!("get layer from key: {key}"))
      62       482377 :             .expect("not found")
      63       482377 :             .clone()
      64       482377 :     }
      65              : 
      66       482469 :     pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
      67       482469 :         self.layers().get(key)
      68       482469 :     }
      69              : 
      70       482333 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      71       482333 :         self.get_from_key(&desc.key())
      72       482333 :     }
      73              : 
      74              :     /// Get an immutable reference to the layer map.
      75              :     ///
      76              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      77              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      78      3369808 :     pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
      79              :         use LayerManager::*;
      80      3369808 :         match self {
      81      3369808 :             Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
      82            0 :             Closed { .. } => Err(Shutdown),
      83              :         }
      84      3369808 :     }
      85              : 
      86         9972 :     pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
      87              :         use LayerManager::*;
      88              : 
      89         9972 :         match self {
      90         9972 :             Open(open) => Ok(open),
      91            0 :             Closed { .. } => Err(Shutdown),
      92              :         }
      93         9972 :     }
      94              : 
      95              :     /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
      96              :     /// order to allow shutdown to complete.
      97              :     ///
      98              :     /// If there was a want to flush in-memory layers, it must have happened earlier.
      99           20 :     pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
     100              :         use LayerManager::*;
     101           20 :         match self {
     102              :             Open(OpenLayerManager {
     103           20 :                 layer_map,
     104           20 :                 layer_fmgr: LayerFileManager(hashmap),
     105           20 :             }) => {
     106           20 :                 // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
     107           20 :                 let open = layer_map.open_layer.take();
     108           20 :                 let frozen = layer_map.frozen_layers.len();
     109           20 :                 let taken_writer_state = writer_state.take();
     110           20 :                 tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
     111           20 :                 let layers = std::mem::take(hashmap);
     112           20 :                 *self = Closed { layers };
     113           20 :                 assert_eq!(open.is_some(), taken_writer_state.is_some());
     114              :             }
     115              :             Closed { .. } => {
     116            0 :                 tracing::debug!("ignoring multiple shutdowns on layer manager")
     117              :             }
     118              :         }
     119           20 :     }
     120              : 
     121              :     /// Sum up the historic layer sizes
     122            0 :     pub(crate) fn layer_size_sum(&self) -> u64 {
     123            0 :         self.layers()
     124            0 :             .values()
     125            0 :             .map(|l| l.layer_desc().file_size)
     126            0 :             .sum()
     127            0 :     }
     128              : 
     129           80 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     130          227 :         self.layers().values().filter(|l| l.is_likely_resident())
     131           80 :     }
     132              : 
     133            0 :     pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     134            0 :         self.layers()
     135            0 :             .values()
     136            0 :             .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
     137            0 :     }
     138              : 
     139          616 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     140          616 :         self.contains_key(&layer.layer_desc().key())
     141          616 :     }
     142              : 
     143          820 :     pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     144          820 :         self.layers().contains_key(key)
     145          820 :     }
     146              : 
     147            0 :     pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
     148            0 :         self.layers().keys().cloned().collect_vec()
     149            0 :     }
     150              : 
     151              :     /// Update the [`LayerFringe`] of a read request
     152              :     ///
     153              :     /// Take a key space at a given LSN and query the layer map below each range
     154              :     /// of the key space to find the next layers to visit.
     155      2150268 :     pub(crate) fn update_search_fringe(
     156      2150268 :         &self,
     157      2150268 :         keyspace: &KeySpace,
     158      2150268 :         cont_lsn: Lsn,
     159      2150268 :         fringe: &mut LayerFringe,
     160      2150268 :     ) -> Result<(), Shutdown> {
     161      2150268 :         let map = self.layer_map()?;
     162              : 
     163      2151676 :         for range in keyspace.ranges.iter() {
     164      2151676 :             let results = map.range_search(range.clone(), cont_lsn);
     165      2151676 :             results
     166      2151676 :                 .found
     167      2151676 :                 .into_iter()
     168      2151676 :                 .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
     169      1694841 :                     (
     170      1694841 :                         self.upgrade(layer),
     171      1694841 :                         keyspace_accum.to_keyspace(),
     172      1694841 :                         lsn_floor..cont_lsn,
     173      1694841 :                     )
     174      2151676 :                 })
     175      2151676 :                 .for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
     176      2151676 :         }
     177              : 
     178      2150268 :         Ok(())
     179      2150268 :     }
     180              : 
     181       483369 :     fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
     182              :         use LayerManager::*;
     183       483369 :         match self {
     184       483369 :             Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
     185            0 :             Closed { layers } => layers,
     186              :         }
     187       483369 :     }
     188              : }
     189              : 
     190              : #[derive(Default)]
     191              : pub(crate) struct OpenLayerManager {
     192              :     layer_map: LayerMap,
     193              :     layer_fmgr: LayerFileManager<Layer>,
     194              : }
     195              : 
     196              : impl std::fmt::Debug for OpenLayerManager {
     197            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     198            0 :         f.debug_struct("OpenLayerManager")
     199            0 :             .field("layer_count", &self.layer_fmgr.0.len())
     200            0 :             .finish()
     201            0 :     }
     202              : }
     203              : 
     204              : #[derive(Debug, thiserror::Error)]
     205              : #[error("layer manager has been shutdown")]
     206              : pub(crate) struct Shutdown;
     207              : 
     208              : impl OpenLayerManager {
     209              :     /// Called from `load_layer_map`. Initialize the layer manager with:
     210              :     /// 1. all on-disk layers
     211              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
     212           12 :     pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
     213           12 :         let mut updates = self.layer_map.batch_update();
     214           44 :         for layer in layers {
     215           32 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
     216           32 :         }
     217           12 :         updates.flush();
     218           12 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     219           12 :     }
     220              : 
     221              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
     222          912 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
     223          912 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     224          912 :     }
     225              : 
     226              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the
     227              :     /// current open layer, called within `get_layer_for_write`.
     228              :     #[allow(clippy::too_many_arguments)]
     229         2612 :     pub(crate) async fn get_layer_for_write(
     230         2612 :         &mut self,
     231         2612 :         lsn: Lsn,
     232         2612 :         conf: &'static PageServerConf,
     233         2612 :         timeline_id: TimelineId,
     234         2612 :         tenant_shard_id: TenantShardId,
     235         2612 :         gate: &utils::sync::gate::Gate,
     236         2612 :         cancel: &CancellationToken,
     237         2612 :         ctx: &RequestContext,
     238         2612 :     ) -> anyhow::Result<Arc<InMemoryLayer>> {
     239         2612 :         ensure!(lsn.is_aligned());
     240              : 
     241              :         // Do we have a layer open for writing already?
     242         2612 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     243            0 :             if open_layer.get_lsn_range().start > lsn {
     244            0 :                 bail!(
     245            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     246            0 :                     open_layer.get_lsn_range().start,
     247            0 :                     lsn
     248            0 :                 );
     249            0 :             }
     250            0 : 
     251            0 :             Arc::clone(open_layer)
     252              :         } else {
     253              :             // No writeable layer yet. Create one.
     254         2612 :             let start_lsn = self
     255         2612 :                 .layer_map
     256         2612 :                 .next_open_layer_at
     257         2612 :                 .context("No next open layer found")?;
     258              : 
     259         2612 :             trace!(
     260            0 :                 "creating in-memory layer at {}/{} for record at {}",
     261              :                 timeline_id, start_lsn, lsn
     262              :             );
     263              : 
     264         2612 :             let new_layer = InMemoryLayer::create(
     265         2612 :                 conf,
     266         2612 :                 timeline_id,
     267         2612 :                 tenant_shard_id,
     268         2612 :                 start_lsn,
     269         2612 :                 gate,
     270         2612 :                 cancel,
     271         2612 :                 ctx,
     272         2612 :             )
     273         2612 :             .await?;
     274         2612 :             let layer = Arc::new(new_layer);
     275         2612 : 
     276         2612 :             self.layer_map.open_layer = Some(layer.clone());
     277         2612 :             self.layer_map.next_open_layer_at = None;
     278         2612 : 
     279         2612 :             layer
     280              :         };
     281              : 
     282         2612 :         Ok(layer)
     283         2612 :     }
     284              : 
     285              :     /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
     286              :     ///
     287              :     /// Returns true if anything was frozen.
     288         2420 :     pub(super) async fn try_freeze_in_memory_layer(
     289         2420 :         &mut self,
     290         2420 :         lsn: Lsn,
     291         2420 :         last_freeze_at: &AtomicLsn,
     292         2420 :         write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
     293         2420 :         metrics: &TimelineMetrics,
     294         2420 :     ) -> bool {
     295         2420 :         let Lsn(last_record_lsn) = lsn;
     296         2420 :         let end_lsn = Lsn(last_record_lsn + 1);
     297              : 
     298         2420 :         let froze = if let Some(open_layer) = &self.layer_map.open_layer {
     299         2364 :             let open_layer_rc = Arc::clone(open_layer);
     300         2364 :             open_layer.freeze(end_lsn).await;
     301              : 
     302              :             // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
     303              :             // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
     304              :             // reference to the timeline metrics. Other methods use a metrics borrow as well.
     305         2364 :             metrics.inc_frozen_layer(open_layer);
     306         2364 : 
     307         2364 :             // The layer is no longer open, update the layer map to reflect this.
     308         2364 :             // We will replace it with on-disk historics below.
     309         2364 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     310         2364 :             self.layer_map.open_layer = None;
     311         2364 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     312         2364 : 
     313         2364 :             true
     314              :         } else {
     315           56 :             false
     316              :         };
     317              : 
     318              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     319              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     320         2420 :         last_freeze_at.store(end_lsn);
     321         2420 : 
     322         2420 :         // the writer state must no longer have a reference to the frozen layer
     323         2420 :         let taken = write_lock.take();
     324         2420 :         assert_eq!(
     325         2420 :             froze,
     326         2420 :             taken.is_some(),
     327            0 :             "should only had frozen a layer when TimelineWriterState existed"
     328              :         );
     329              : 
     330         2420 :         froze
     331         2420 :     }
     332              : 
     333              :     /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
     334         1156 :     pub(crate) fn track_new_image_layers(
     335         1156 :         &mut self,
     336         1156 :         image_layers: &[ResidentLayer],
     337         1156 :         metrics: &TimelineMetrics,
     338         1156 :     ) {
     339         1156 :         let mut updates = self.layer_map.batch_update();
     340         1660 :         for layer in image_layers {
     341          504 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     342          504 : 
     343          504 :             // record these here instead of Layer::finish_creating because otherwise partial
     344          504 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     345          504 :             // is that all layers need to be created before metrics are updated.
     346          504 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     347          504 :         }
     348         1156 :         updates.flush();
     349         1156 :     }
     350              : 
     351              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     352         2364 :     pub(crate) fn finish_flush_l0_layer(
     353         2364 :         &mut self,
     354         2364 :         delta_layer: Option<&ResidentLayer>,
     355         2364 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     356         2364 :         metrics: &TimelineMetrics,
     357         2364 :     ) {
     358         2364 :         let inmem = self
     359         2364 :             .layer_map
     360         2364 :             .frozen_layers
     361         2364 :             .pop_front()
     362         2364 :             .expect("there must be a inmem layer to flush");
     363         2364 :         metrics.dec_frozen_layer(&inmem);
     364         2364 : 
     365         2364 :         // Only one task may call this function at a time (for this
     366         2364 :         // timeline). If two tasks tried to flush the same frozen
     367         2364 :         // layer to disk at the same time, that would not work.
     368         2364 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     369              : 
     370         2364 :         if let Some(l) = delta_layer {
     371         1936 :             let mut updates = self.layer_map.batch_update();
     372         1936 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     373         1936 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     374         1936 :             updates.flush();
     375         1936 :         }
     376         2364 :     }
     377              : 
     378              :     /// Called when compaction is completed.
     379           56 :     pub(crate) fn finish_compact_l0(
     380           56 :         &mut self,
     381           56 :         compact_from: &[Layer],
     382           56 :         compact_to: &[ResidentLayer],
     383           56 :         metrics: &TimelineMetrics,
     384           56 :     ) {
     385           56 :         let mut updates = self.layer_map.batch_update();
     386          672 :         for l in compact_to {
     387          616 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     388          616 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     389          616 :         }
     390          860 :         for l in compact_from {
     391          804 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     392          804 :         }
     393           56 :         updates.flush();
     394           56 :     }
     395              : 
     396              :     /// Called when a GC-compaction is completed.
     397           96 :     pub(crate) fn finish_gc_compaction(
     398           96 :         &mut self,
     399           96 :         compact_from: &[Layer],
     400           96 :         compact_to: &[ResidentLayer],
     401           96 :         metrics: &TimelineMetrics,
     402           96 :     ) {
     403           96 :         // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
     404           96 : 
     405           96 :         // Match the old layers with the new layers
     406           96 :         let mut add_layers = HashMap::new();
     407           96 :         let mut rewrite_layers = HashMap::new();
     408           96 :         let mut drop_layers = HashMap::new();
     409          304 :         for layer in compact_from {
     410          208 :             drop_layers.insert(layer.layer_desc().key(), layer.clone());
     411          208 :         }
     412          204 :         for layer in compact_to {
     413          108 :             if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
     414            0 :                 rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
     415          108 :             } else {
     416          108 :                 add_layers.insert(layer.layer_desc().key(), layer.clone());
     417          108 :             }
     418              :         }
     419           96 :         let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
     420           96 :         let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
     421           96 :         let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
     422           96 : 
     423           96 :         self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
     424           96 :     }
     425              : 
     426              :     /// Called post-compaction when some previous generation image layers were trimmed.
     427            0 :     pub fn rewrite_layers(
     428            0 :         &mut self,
     429            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     430            0 :         drop_layers: &[Layer],
     431            0 :         metrics: &TimelineMetrics,
     432            0 :     ) {
     433            0 :         self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
     434            0 :     }
     435              : 
     436           96 :     fn rewrite_layers_inner(
     437           96 :         &mut self,
     438           96 :         rewrite_layers: &[(Layer, ResidentLayer)],
     439           96 :         drop_layers: &[Layer],
     440           96 :         add_layers: &[ResidentLayer],
     441           96 :         metrics: &TimelineMetrics,
     442           96 :     ) {
     443           96 :         let mut updates = self.layer_map.batch_update();
     444           96 :         for (old_layer, new_layer) in rewrite_layers {
     445            0 :             debug_assert_eq!(
     446            0 :                 old_layer.layer_desc().key_range,
     447            0 :                 new_layer.layer_desc().key_range
     448              :             );
     449            0 :             debug_assert_eq!(
     450            0 :                 old_layer.layer_desc().lsn_range,
     451            0 :                 new_layer.layer_desc().lsn_range
     452              :             );
     453              : 
     454              :             // Transfer visibility hint from old to new layer, since the new layer covers the same key space.  This is not guaranteed to
     455              :             // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
     456              :             // always marking rewritten layers as visible.
     457            0 :             new_layer.as_ref().set_visibility(old_layer.visibility());
     458            0 : 
     459            0 :             // Safety: we may never rewrite the same file in-place.  Callers are responsible
     460            0 :             // for ensuring that they only rewrite layers after something changes the path,
     461            0 :             // such as an increment in the generation number.
     462            0 :             assert_ne!(old_layer.local_path(), new_layer.local_path());
     463              : 
     464            0 :             Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
     465            0 : 
     466            0 :             Self::insert_historic_layer(
     467            0 :                 new_layer.as_ref().clone(),
     468            0 :                 &mut updates,
     469            0 :                 &mut self.layer_fmgr,
     470            0 :             );
     471            0 : 
     472            0 :             metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
     473              :         }
     474          304 :         for l in drop_layers {
     475          208 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     476          208 :         }
     477          204 :         for l in add_layers {
     478          108 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     479          108 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     480          108 :         }
     481           96 :         updates.flush();
     482           96 :     }
     483              : 
     484              :     /// Called when garbage collect has selected the layers to be removed.
     485           16 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     486           16 :         let mut updates = self.layer_map.batch_update();
     487           36 :         for doomed_layer in gc_layers {
     488           20 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     489           20 :         }
     490           16 :         updates.flush()
     491           16 :     }
     492              : 
     493              :     #[cfg(test)]
     494          320 :     pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
     495          320 :         let mut updates = self.layer_map.batch_update();
     496          320 :         Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     497          320 :         updates.flush()
     498          320 :     }
     499              : 
     500              :     /// Helper function to insert a layer into the layer map and file manager.
     501         3516 :     fn insert_historic_layer(
     502         3516 :         layer: Layer,
     503         3516 :         updates: &mut BatchedUpdates<'_>,
     504         3516 :         mapping: &mut LayerFileManager<Layer>,
     505         3516 :     ) {
     506         3516 :         updates.insert_historic(layer.layer_desc().clone());
     507         3516 :         mapping.insert(layer);
     508         3516 :     }
     509              : 
     510              :     /// Removes the layer from local FS (if present) and from memory.
     511              :     /// Remote storage is not affected by this operation.
     512         1032 :     fn delete_historic_layer(
     513         1032 :         // we cannot remove layers otherwise, since gc and compaction will race
     514         1032 :         layer: &Layer,
     515         1032 :         updates: &mut BatchedUpdates<'_>,
     516         1032 :         mapping: &mut LayerFileManager<Layer>,
     517         1032 :     ) {
     518         1032 :         let desc = layer.layer_desc();
     519         1032 : 
     520         1032 :         // TODO Removing from the bottom of the layer map is expensive.
     521         1032 :         //      Maybe instead discard all layer map historic versions that
     522         1032 :         //      won't be needed for page reconstruction for this timeline,
     523         1032 :         //      and mark what we can't delete yet as deleted from the layer
     524         1032 :         //      map index without actually rebuilding the index.
     525         1032 :         updates.remove_historic(desc);
     526         1032 :         mapping.remove(layer);
     527         1032 :         layer.delete_on_drop();
     528         1032 :     }
     529              : 
     530              :     #[cfg(test)]
     531            8 :     pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
     532              :         use pageserver_api::models::InMemoryLayerInfo;
     533              : 
     534            8 :         match layer.info() {
     535              :             InMemoryLayerInfo::Open { .. } => {
     536            4 :                 assert!(self.layer_map.open_layer.is_none());
     537            4 :                 self.layer_map.open_layer = Some(layer);
     538              :             }
     539            4 :             InMemoryLayerInfo::Frozen { lsn_start, .. } => {
     540            4 :                 if let Some(last) = self.layer_map.frozen_layers.back() {
     541            0 :                     assert!(last.get_lsn_range().end <= lsn_start);
     542            4 :                 }
     543              : 
     544            4 :                 self.layer_map.frozen_layers.push_back(layer);
     545              :             }
     546              :         }
     547            8 :     }
     548              : }
     549              : 
     550              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     551              : 
     552              : impl<T> Default for LayerFileManager<T> {
     553          924 :     fn default() -> Self {
     554          924 :         Self(HashMap::default())
     555          924 :     }
     556              : }
     557              : 
     558              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     559         3516 :     pub(crate) fn insert(&mut self, layer: T) {
     560         3516 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     561         3516 :         if present.is_some() && cfg!(debug_assertions) {
     562            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     563         3516 :         }
     564         3516 :     }
     565              : 
     566         1032 :     pub(crate) fn remove(&mut self, layer: &T) {
     567         1032 :         let present = self.0.remove(&layer.layer_desc().key());
     568         1032 :         if present.is_none() && cfg!(debug_assertions) {
     569            0 :             panic!(
     570            0 :                 "removing layer that is not present in layer mapping: {:?}",
     571            0 :                 layer.layer_desc()
     572            0 :             )
     573         1032 :         }
     574         1032 :     }
     575              : }
        

Generated by: LCOV version 2.1-beta